Skip to content

Commit 56eb806

Browse files
authored
Merge branch 'main' into columnApis3.1
2 parents 997b2b1 + 469f260 commit 56eb806

File tree

5 files changed

+113
-0
lines changed

5 files changed

+113
-0
lines changed

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamReaderTests.cs

+23
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44

55
using System.Collections.Generic;
66
using System.IO;
7+
using System.Linq;
8+
using Microsoft.Spark.E2ETest.Utils;
79
using Microsoft.Spark.Sql;
810
using Microsoft.Spark.Sql.Streaming;
911
using Microsoft.Spark.Sql.Types;
1012
using Xunit;
13+
using static Microsoft.Spark.E2ETest.Utils.SQLUtils;
1114

1215
namespace Microsoft.Spark.E2ETest.IpcTests
1316
{
@@ -69,5 +72,25 @@ public void TestSignaturesV2_3_X()
6972
// needs to be set in these scenarios as well.
7073
Assert.IsType<DataFrame>(dsr.Format("json").Option("path", jsonFilePath).Load());
7174
}
75+
76+
/// <summary>
77+
/// Test signatures for APIs introduced in Spark 3.1.*.
78+
/// </summary>
79+
[SkipIfSparkVersionIsLessThan(Versions.V3_1_0)]
80+
public void TestSignaturesV3_1_X()
81+
{
82+
string tableName = "input_table";
83+
WithTable(
84+
_spark,
85+
new string[] { tableName },
86+
() =>
87+
{
88+
DataStreamReader dsr = _spark.ReadStream();
89+
var intMemoryStream = new MemoryStream<int>(_spark);
90+
intMemoryStream.AddData(Enumerable.Range(1, 10).ToArray());
91+
intMemoryStream.ToDF().CreateOrReplaceTempView(tableName);
92+
Assert.IsType<DataFrame>(dsr.Table(tableName));
93+
});
94+
}
7295
}
7396
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs

+24
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Microsoft.Spark.Sql.Types;
1313
using Microsoft.Spark.UnitTest.TestUtils;
1414
using Xunit;
15+
using static Microsoft.Spark.E2ETest.Utils.SQLUtils;
1516
using static Microsoft.Spark.Sql.Functions;
1617

1718
namespace Microsoft.Spark.E2ETest.IpcTests
@@ -67,6 +68,29 @@ public void TestSignaturesV2_3_X()
6768
Assert.IsType<DataStreamWriter>(dsw.Trigger(Trigger.Once()));
6869
}
6970

71+
/// <summary>
72+
/// Test signatures for APIs introduced in Spark 3.1.*.
73+
/// </summary>
74+
[SkipIfSparkVersionIsLessThan(Versions.V3_1_0)]
75+
public void TestSignaturesV3_1_X()
76+
{
77+
string tableName = "output_table";
78+
WithTable(
79+
_spark,
80+
new string[] { tableName },
81+
() =>
82+
{
83+
using var tempDirectory = new TemporaryDirectory();
84+
var intMemoryStream = new MemoryStream<int>(_spark);
85+
DataStreamWriter dsw = intMemoryStream
86+
.ToDF()
87+
.WriteStream()
88+
.Format("parquet")
89+
.Option("checkpointLocation", tempDirectory.Path);
90+
Assert.IsType<StreamingQuery>(dsw.ToTable(tableName));
91+
});
92+
}
93+
7094
[SkipIfSparkVersionIsLessThan(Versions.V2_4_0)]
7195
public void TestForeachBatch()
7296
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Linq;
8+
using Microsoft.Spark.Sql;
9+
10+
namespace Microsoft.Spark.E2ETest.Utils
11+
{
12+
internal static class SQLUtils
13+
{
14+
/// <summary>
15+
/// Drops tables in <paramref name="tableNames"/> after calling <paramref name="action"/>.
16+
/// </summary>
17+
/// <param name="spark">The <see cref="SparkSession"/></param>
18+
/// <param name="tableNames">Names of the tables to drop</param>
19+
/// <param name="action"><see cref="Action"/> to execute.</param>
20+
public static void WithTable(SparkSession spark, IEnumerable<string> tableNames, Action action)
21+
{
22+
try
23+
{
24+
action();
25+
}
26+
finally
27+
{
28+
tableNames.ToList().ForEach(name => spark.Sql($"DROP TABLE IF EXISTS {name}"));
29+
}
30+
}
31+
}
32+
}

src/csharp/Microsoft.Spark/Sql/Streaming/DataStreamReader.cs

+10
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,16 @@ public DataFrame Load(string path) =>
166166
/// <returns>DataFrame object</returns>
167167
public DataFrame Parquet(string path) => LoadSource("parquet", path);
168168

169+
/// <summary>
170+
/// Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
171+
/// support streaming mode.
172+
/// </summary>
173+
/// <param name="tableName">Name of the table</param>
174+
/// <returns>DataFrame object</returns>
175+
[Since(Versions.V3_1_0)]
176+
public DataFrame Table(string tableName) =>
177+
new DataFrame((JvmObjectReference)_jvmObject.Invoke("table", tableName));
178+
169179
/// <summary>
170180
/// Loads text files and returns a <see cref="DataFrame"/> whose schema starts
171181
/// with a string column named "value", and followed by partitioned columns

src/csharp/Microsoft.Spark/Sql/Streaming/DataStreamWriter.cs

+24
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,30 @@ public StreamingQuery Start(string path = null)
178178
return new StreamingQuery((JvmObjectReference)_jvmObject.Invoke("start"));
179179
}
180180

181+
/// <summary>
182+
/// Starts the execution of the streaming query, which will continually output results to the
183+
/// given table as new data arrives. The returned <see cref="StreamingQuery"/> object can be
184+
/// used to interact with the stream.
185+
/// </summary>
186+
/// <remarks>
187+
/// For v1 table, partitioning columns provided by <see cref="PartitionBy(string[])"/> will be
188+
/// respected no matter the table exists or not. A new table will be created if the table not
189+
/// exists.
190+
///
191+
/// For v2 table, <see cref="PartitionBy(string[])"/> will be ignored if the table already exists.
192+
/// <see cref="PartitionBy(string[])"/> will be respected only if the v2 table does not exist.
193+
/// Besides, the v2 table created by this API lacks some functionalities (e.g., customized
194+
/// properties, options, and serde info). If you need them, please create the v2 table manually
195+
/// before the execution to avoid creating a table with incomplete information.
196+
/// </remarks>
197+
/// <param name="tableName">Name of the table</param>
198+
/// <returns>StreamingQuery object</returns>
199+
[Since(Versions.V3_1_0)]
200+
public StreamingQuery ToTable(string tableName)
201+
{
202+
return new StreamingQuery((JvmObjectReference)_jvmObject.Invoke("toTable", tableName));
203+
}
204+
181205
/// <summary>
182206
/// Sets the output of the streaming query to be processed using the provided
183207
/// writer object. See <see cref="IForeachWriter"/> for more details on the

0 commit comments

Comments
 (0)