Skip to content

Commit e50d7b2

Browse files
authored
Expose DataStreamWriter.PartitionBy(). (#270)
1 parent 259de48 commit e50d7b2

File tree

3 files changed

+75
-7
lines changed

3 files changed

+75
-7
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using Microsoft.Spark.Sql;
7+
using Microsoft.Spark.Sql.Streaming;
8+
using Xunit;
9+
10+
namespace Microsoft.Spark.E2ETest.IpcTests
11+
{
12+
[Collection("Spark E2E Tests")]
13+
public class DataStreamWriterTests
14+
{
15+
private readonly SparkSession _spark;
16+
17+
public DataStreamWriterTests(SparkFixture fixture)
18+
{
19+
_spark = fixture.Spark;
20+
}
21+
22+
/// <summary>
23+
/// Test signatures for APIs up to Spark 2.3.*.
24+
/// </summary>
25+
[Fact]
26+
public void TestSignaturesV2_3_X()
27+
{
28+
DataFrame df = _spark
29+
.ReadStream()
30+
.Format("rate")
31+
.Option("rowsPerSecond", 1)
32+
.Load();
33+
34+
DataStreamWriter dsw = df.WriteStream();
35+
36+
Assert.IsType<DataStreamWriter>(dsw.OutputMode("append"));
37+
38+
Assert.IsType<DataStreamWriter>(dsw.OutputMode(OutputMode.Append));
39+
40+
Assert.IsType<DataStreamWriter>(dsw.Format("json"));
41+
42+
Assert.IsType<DataStreamWriter>(dsw.Option("stringOption", "value"));
43+
Assert.IsType<DataStreamWriter>(dsw.Option("boolOption", true));
44+
Assert.IsType<DataStreamWriter>(dsw.Option("longOption", 1L));
45+
Assert.IsType<DataStreamWriter>(dsw.Option("doubleOption", 3D));
46+
47+
Assert.IsType<DataStreamWriter>(
48+
dsw.Options(
49+
new Dictionary<string, string>
50+
{
51+
{ "option1", "value1" },
52+
{ "option2", "value2" }
53+
}));
54+
55+
Assert.IsType<DataStreamWriter>(dsw.PartitionBy("age"));
56+
Assert.IsType<DataStreamWriter>(dsw.PartitionBy("age", "name"));
57+
58+
Assert.IsType<DataStreamWriter>(dsw.QueryName("queryName"));
59+
60+
Assert.IsType<DataStreamWriter>(dsw.Trigger(Trigger.Once()));
61+
}
62+
}
63+
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/TriggerTests.cs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,6 @@ namespace Microsoft.Spark.E2ETest.IpcTests
1111
[Collection("Spark E2E Tests")]
1212
public class TriggerTests
1313
{
14-
private readonly SparkSession _spark;
15-
16-
public TriggerTests(SparkFixture fixture)
17-
{
18-
_spark = fixture.Spark;
19-
}
20-
2114
/// <summary>
2215
/// Test Trigger's static functions
2316
/// </summary>

src/csharp/Microsoft.Spark/Sql/Streaming/DataStreamWriter.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public DataStreamWriter Format(string source)
5959
return this;
6060
}
6161

62+
/// <summary>
63+
/// Partitions the output by the given columns on the file system. If specified,
64+
/// the output is laid out on the file system similar to Hive's partitioning scheme.
65+
/// </summary>
66+
/// <param name="colNames">Column names to partition by</param>
67+
/// <returns>This DataStreamWriter object</returns>
68+
public DataStreamWriter PartitionBy(params string[] colNames)
69+
{
70+
_jvmObject.Invoke("partitionBy", (object)colNames);
71+
return this;
72+
}
73+
6274
/// <summary>
6375
/// Adds an output option for the underlying data source.
6476
/// </summary>

0 commit comments

Comments
 (0)