Skip to content

Add Delta Lake version annotations to Microsoft.Spark.Extensions.Delta APIs. #632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;

namespace Microsoft.Spark.Extensions.Delta
{
/// <summary>
/// Custom attribute to denote the Delta Lake version in which an API is introduced.
/// </summary>
[AttributeUsage(AttributeTargets.All)]
public sealed class DeltaLakeSinceAttribute : VersionAttribute
{
/// <summary>
/// Constructor for DeltaLakeSinceAttribute class.
/// </summary>
/// <param name="version">Delta Lake version</param>
public DeltaLakeSinceAttribute(string version)
: base(version)
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

namespace Microsoft.Spark.Extensions.Delta
{
internal static class DeltaLakeVersions
{
internal const string V0_1_0 = "0.1.0";
internal const string V0_2_0 = "0.2.0";
internal const string V0_3_0 = "0.3.0";
internal const string V0_4_0 = "0.4.0";
internal const string V0_5_0 = "0.5.0";
internal const string V0_6_0 = "0.6.0";
internal const string V0_6_1 = "0.6.1";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
/// </code>
/// </example>
/// </summary>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public class DeltaMergeBuilder : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;
Expand All @@ -92,6 +93,7 @@ internal DeltaMergeBuilder(JvmObjectReference jvmObject)
/// delete the matched target table row with the source row.
/// </summary>
/// <returns>DeltaMergeMatchedActionBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeMatchedActionBuilder WhenMatched() =>
new DeltaMergeMatchedActionBuilder(
(JvmObjectReference)_jvmObject.Invoke("whenMatched"));
Expand All @@ -104,6 +106,7 @@ public DeltaMergeMatchedActionBuilder WhenMatched() =>
/// </summary>
/// <param name="condition">Boolean expression as a SQL formatted string.</param>
/// <returns>DeltaMergeMatchedActionBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeMatchedActionBuilder WhenMatched(string condition) =>
new DeltaMergeMatchedActionBuilder(
(JvmObjectReference)_jvmObject.Invoke("whenMatched", condition));
Expand All @@ -116,6 +119,7 @@ public DeltaMergeMatchedActionBuilder WhenMatched(string condition) =>
/// </summary>
/// <param name="condition">Boolean expression as a Column object.</param>
/// <returns>DeltaMergeMatchedActionBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeMatchedActionBuilder WhenMatched(Column condition) =>
new DeltaMergeMatchedActionBuilder(
(JvmObjectReference)_jvmObject.Invoke("whenMatched", condition));
Expand All @@ -126,6 +130,7 @@ public DeltaMergeMatchedActionBuilder WhenMatched(Column condition) =>
/// new sourced row into the target table.
/// </summary>
/// <returns>DeltaMergeNotMatchedActionBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeNotMatchedActionBuilder WhenNotMatched() =>
new DeltaMergeNotMatchedActionBuilder(
(JvmObjectReference)_jvmObject.Invoke("whenNotMatched"));
Expand All @@ -137,6 +142,7 @@ public DeltaMergeNotMatchedActionBuilder WhenNotMatched() =>
/// </summary>
/// <param name="condition">Boolean expression as a SQL formatted string.</param>
/// <returns>DeltaMergeNotMatchedActionBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeNotMatchedActionBuilder WhenNotMatched(string condition) =>
new DeltaMergeNotMatchedActionBuilder(
(JvmObjectReference)_jvmObject.Invoke("whenNotMatched", condition));
Expand All @@ -148,13 +154,15 @@ public DeltaMergeNotMatchedActionBuilder WhenNotMatched(string condition) =>
/// </summary>
/// <param name="condition">Boolean expression as a Column object.</param>
/// <returns>DeltaMergeNotMatchedActionBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeNotMatchedActionBuilder WhenNotMatched(Column condition) =>
new DeltaMergeNotMatchedActionBuilder(
(JvmObjectReference)_jvmObject.Invoke("whenNotMatched", condition));

/// <summary>
/// Execute the merge operation based on the built matched and not matched actions.
/// </summary>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void Execute() => _jvmObject.Invoke("execute");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
/// Builder class to specify the actions to perform when a target table row has matched a
/// source row based on the given merge condition and optional match condition.
/// </summary>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public class DeltaMergeMatchedActionBuilder : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;
Expand All @@ -29,6 +30,7 @@ internal DeltaMergeMatchedActionBuilder(JvmObjectReference jvmObject)
/// <param name="set">Rules to update a row as amap between target column names and
/// corresponding update expressions as Column objects.</param>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder Update(Dictionary<string, Column> set) =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("update", set));

Expand All @@ -38,6 +40,7 @@ public DeltaMergeBuilder Update(Dictionary<string, Column> set) =>
/// <param name="set">Rules to update a row as a map between target column names and
/// corresponding update expressions as SQL formatted strings.</param>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder UpdateExpr(Dictionary<string, string> set) =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("updateExpr", set));

Expand All @@ -46,13 +49,15 @@ public DeltaMergeBuilder UpdateExpr(Dictionary<string, string> set) =>
/// columns in the source row.
/// </summary>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder UpdateAll() =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("updateAll"));

/// <summary>
/// Delete a matched row from the table.
/// </summary>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder Delete() =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("delete"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
///
/// See <see cref="DeltaMergeBuilder"/> for more information.
/// </summary>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public class DeltaMergeNotMatchedActionBuilder : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;
Expand All @@ -32,6 +33,7 @@ internal DeltaMergeNotMatchedActionBuilder(JvmObjectReference jvmObject)
/// <param name="values">Rules to insert a row as a map between target column names and
/// corresponding expressions as Column objects.</param>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder Insert(Dictionary<string, Column> values) =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("insert", values));

Expand All @@ -41,6 +43,7 @@ public DeltaMergeBuilder Insert(Dictionary<string, Column> values) =>
/// <param name="values">Rules to insert a row as a map between target column names and
/// corresponding expressions as SQL formatted strings.</param>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder InsertExpr(Dictionary<string, string> values) =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("insertExpr", values));

Expand All @@ -49,6 +52,7 @@ public DeltaMergeBuilder InsertExpr(Dictionary<string, string> values) =>
/// corresponding columns in the source row.
/// </summary>
/// <returns>DeltaMergeBuilder object.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder InsertAll() =>
new DeltaMergeBuilder((JvmObjectReference)_jvmObject.Invoke("insertAll"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
/// DeltaTable.ForPath(sparkSession, pathToTheDeltaTable)
/// </code>
/// </summary>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public class DeltaTable : IJvmObjectReferenceProvider
{
private readonly JvmObjectReference _jvmObject;
Expand Down Expand Up @@ -56,6 +57,7 @@ internal DeltaTable(JvmObjectReference jvmObject)
/// <param name="identifier">String used to identify the parquet table.</param>
/// <param name="partitionSchema">StructType representing the partition schema.</param>
/// <returns>The converted DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_4_0)]
public static DeltaTable ConvertToDelta(
SparkSession spark,
string identifier,
Expand Down Expand Up @@ -86,6 +88,7 @@ public static DeltaTable ConvertToDelta(
/// <param name="identifier">String used to identify the parquet table.</param>
/// <param name="partitionSchema">String representing the partition schema.</param>
/// <returns>The converted DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_4_0)]
public static DeltaTable ConvertToDelta(
SparkSession spark,
string identifier,
Expand Down Expand Up @@ -114,6 +117,7 @@ public static DeltaTable ConvertToDelta(
/// <param name="spark">The relevant session.</param>
/// <param name="identifier">String used to identify the parquet table.</param>
/// <returns>The converted DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_4_0)]
public static DeltaTable ConvertToDelta(SparkSession spark, string identifier) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
Expand All @@ -131,6 +135,7 @@ public static DeltaTable ConvertToDelta(SparkSession spark, string identifier) =
/// </summary>
/// <param name="path">The path to the data.</param>
/// <returns>DeltaTable loaded from the path.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public static DeltaTable ForPath(string path) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
Expand All @@ -145,6 +150,7 @@ public static DeltaTable ForPath(string path) =>
/// <param name="sparkSession">The active SparkSession.</param>
/// <param name="path">The path to the data.</param>
/// <returns>DeltaTable loaded from the path.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
Expand All @@ -165,6 +171,7 @@ public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
/// <param name="sparkSession">The relevant session.</param>
/// <param name="identifier">String that identifies the table, e.g. path to table.</param>
/// <returns>True if the table is a DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_4_0)]
public static bool IsDeltaTable(SparkSession sparkSession, string identifier) =>
(bool)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
Expand All @@ -187,6 +194,7 @@ public static bool IsDeltaTable(SparkSession sparkSession, string identifier) =>
/// </summary>
/// <param name="identifier">String that identifies the table, e.g. path to table.</param>
/// <returns>True if the table is a DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_4_0)]
public static bool IsDeltaTable(string identifier) =>
(bool)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
Expand All @@ -199,6 +207,7 @@ public static bool IsDeltaTable(string identifier) =>
/// </summary>
/// <param name="alias">The table alias.</param>
/// <returns>Aliased DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaTable As(string alias) =>
new DeltaTable((JvmObjectReference)_jvmObject.Invoke("as", alias));

Expand All @@ -208,13 +217,15 @@ public DeltaTable As(string alias) =>
/// </summary>
/// <param name="alias">The table alias.</param>
/// <returns>Aliased DeltaTable.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaTable Alias(string alias) =>
new DeltaTable((JvmObjectReference)_jvmObject.Invoke("alias", alias));

/// <summary>
/// Get a DataFrame (that is, Dataset[Row]) representation of this Delta table.
/// </summary>
/// <returns>DataFrame representation of Delta table.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DataFrame ToDF() => new DataFrame((JvmObjectReference)_jvmObject.Invoke("toDF"));

/// <summary>
Expand All @@ -226,6 +237,7 @@ public DeltaTable Alias(string alias) =>
/// table for reading versions earlier than this will be preserved and the rest of them
/// will be deleted.</param>
/// <returns>Vacuumed DataFrame.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DataFrame Vacuum(double retentionHours) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("vacuum", retentionHours));

Expand All @@ -237,6 +249,7 @@ public DataFrame Vacuum(double retentionHours) =>
/// Note: This will use the default retention period of 7 days.
/// </summary>
/// <returns>Vacuumed DataFrame.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DataFrame Vacuum() =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("vacuum"));

Expand All @@ -246,6 +259,7 @@ public DataFrame Vacuum() =>
/// </summary>
/// <param name="limit">The number of previous commands to get history for.</param>
/// <returns>History DataFrame.</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DataFrame History(int limit) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("history", limit));

Expand All @@ -254,6 +268,7 @@ public DataFrame History(int limit) =>
/// information is in reverse chronological order.
/// </summary>
/// <returns>History DataFrame</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DataFrame History() =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("history"));

Expand All @@ -265,23 +280,27 @@ public DataFrame History() =>
/// - "symlink_format_manifest" : This will generate manifests in symlink format
/// for Presto and Athena read support.
/// See the online documentation for more information.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_5_0)]
public void Generate(string mode) => _jvmObject.Invoke("generate", mode);

/// <summary>
/// Delete data from the table that match the given <c>condition</c>.
/// </summary>
/// <param name="condition">Boolean SQL expression.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void Delete(string condition) => _jvmObject.Invoke("delete", condition);

/// <summary>
/// Delete data from the table that match the given <c>condition</c>.
/// </summary>
/// <param name="condition">Boolean SQL expression.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void Delete(Column condition) => _jvmObject.Invoke("delete", condition);

/// <summary>
/// Delete data from the table.
/// </summary>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void Delete() => _jvmObject.Invoke("delete");

/// <summary>
Expand All @@ -297,6 +316,7 @@ public DataFrame History() =>
/// </example>
/// <param name="set">Pules to update a row as a Scala map between target column names
/// and corresponding update expressions as Column objects.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void Update(Dictionary<string, Column> set) => _jvmObject.Invoke("update", set);

/// <summary>
Expand All @@ -317,6 +337,7 @@ public DataFrame History() =>
/// to update.</param>
/// <param name="set">Rules to update a row as a Scala map between target column names and
/// corresponding update expressions as Column objects.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void Update(Column condition, Dictionary<string, Column> set) =>
_jvmObject.Invoke("update", condition, set);

Expand All @@ -334,6 +355,7 @@ public void Update(Column condition, Dictionary<string, Column> set) =>
/// </example>
/// <param name="set">Rules to update a row as a Scala map between target column names and
/// corresponding update expressions as SQL formatted strings.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void UpdateExpr(Dictionary<string, string> set) =>
_jvmObject.Invoke("updateExpr", set);

Expand All @@ -355,6 +377,7 @@ public void UpdateExpr(Dictionary<string, string> set) =>
/// which rows to update.</param>
/// <param name="set">Rules to update a row as a map between target column names and
/// corresponding update expressions as SQL formatted strings.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public void UpdateExpr(string condition, Dictionary<string, string> set) =>
_jvmObject.Invoke("updateExpr", condition, set);

Expand Down Expand Up @@ -395,6 +418,7 @@ public void UpdateExpr(string condition, Dictionary<string, string> set) =>
/// <param name="source">Source Dataframe to be merged.</param>
/// <param name="condition">Boolean expression as SQL formatted string.</param>
/// <returns>DeltaMergeBuilder</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder Merge(DataFrame source, string condition) =>
new DeltaMergeBuilder(
(JvmObjectReference)_jvmObject.Invoke(
Expand Down Expand Up @@ -436,6 +460,7 @@ public DeltaMergeBuilder Merge(DataFrame source, string condition) =>
/// <param name="source">Source Dataframe to be merged.</param>
/// <param name="condition">Coolean expression as a Column object</param>
/// <returns>DeltaMergeBuilder</returns>
[DeltaLakeSince(DeltaLakeVersions.V0_3_0)]
public DeltaMergeBuilder Merge(DataFrame source, Column condition) =>
new DeltaMergeBuilder(
(JvmObjectReference)_jvmObject.Invoke(
Expand Down