Skip to content

Support Spark 3.1.1 #836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,54 @@ variables:
forwardCompatibleTestOptions_Windows_3_0_2: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Linux_3_0_2: $(forwardCompatibleTestOptions_Windows_3_0_2)

# Skip backwardCompatible tests because Microsoft.Spark.Worker requires Spark 3.1 support in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it seems that we need 2.0?

Assuming we release 1.1, if the client is using 1.1 for Spark 3.1, and the worker installed is 1.0, things will break. Right?

Copy link
Member Author

@suhsteve suhsteve Mar 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the 1.0.0 Worker will break any time we introduce a version that supports a new minor version of Spark and the user is using that version.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question, why is the above ^ a must? If the Spark minor version doesn't involve changes to how stream is sent, can't we just do what Terry suggests here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one said it's a must, it's just the current pattern we have it set up with. The suggestion that Terry brought up was already discussed here

# CommandProcessor.cs and TaskContextProcessor.cs. Support added in https://github.com/dotnet/spark/pull/836
backwardCompatibleTestOptions_Windows_3_1: "--filter \
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameGroupedMapUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfRegistrationWithReturnAsRowType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithArrayChain)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithSimpleArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithMapType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithRowArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsMapType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsArrayOfArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithArrayOfArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithMapOfMapType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsSimpleArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithRowType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsRowType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSerDeTests.TestExternalStaticMethodCall)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSerDeTests.TestInitExternalClassInUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSerDeTests.TestUdfClosure)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithReturnAsDateType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithReturnAsTimestampType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithDateType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithTimestampType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestMultipleBroadcast)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestUnpersist)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestDestroy)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.PairRDDFunctionsTests.TestCollect)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestPipelinedRDD)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestMap)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestFlatMap)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestMapPartitions)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestMapPartitionsWithIndex)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestTextFile)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestFilter)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameVectorUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestWithColumn)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestUDF)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.SparkSessionExtensionsTests.TestVersion)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataStreamWriterTests.TestForeachBatch)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataStreamWriterTests.TestForeach)"
# Skip all forwardCompatible tests since microsoft-spark-3-1 jar does not get built when
# building forwardCompatible repo.
forwardCompatibleTestOptions_Windows_3_1: "--filter FullyQualifiedName=NONE"
backwardCompatibleTestOptions_Linux_3_1: $(backwardCompatibleTestOptions_Windows_3_1)
forwardCompatibleTestOptions_Linux_3_1: $(forwardCompatibleTestOptions_Windows_3_1)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand Down Expand Up @@ -361,3 +409,13 @@ stages:
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_0)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_0_2)
- version: '3.1.1'
jobOptions:
- pool: 'Hosted VS2017'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_1)
forwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_1)
- pool: 'Hosted Ubuntu 1604'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_1)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_1)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public DeltaTableTests(DeltaFixture fixture)
/// Run the end-to-end scenario from the Delta Quickstart tutorial.
/// </summary>
/// <see cref="https://docs.delta.io/latest/quick-start.html"/>
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
///
/// Delta 0.8.0 is not compatible with Spark 3.1.1
/// Disable Delta tests that have code paths that create an
/// `org.apache.spark.sql.catalyst.expressions.Alias` object.
[SkipIfSparkVersionIsNotInRange(Versions.V2_4_2, Versions.V3_1_1)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disable Delta tests that have code path that creates a org.apache.spark.sql.catalyst.expressions.Alias object. Delta 0.8.0 is not compatible with Spark 3.1.1 delta-io/delta#594

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add this comment to the code?

public void TestTutorialScenario()
{
using var tempDirectory = new TemporaryDirectory();
Expand Down Expand Up @@ -223,7 +227,11 @@ void testWrapper(
/// <summary>
/// Test that methods return the expected signature.
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
///
/// Delta 0.8.0 is not compatible with Spark 3.1.1
/// Disable Delta tests that have code paths that create an
/// `org.apache.spark.sql.catalyst.expressions.Alias` object.
[SkipIfSparkVersionIsNotInRange(Versions.V2_4_2, Versions.V3_1_1)]
public void TestSignaturesV2_4_X()
{
using var tempDirectory = new TemporaryDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ public void TestSignaturesV2_3_X()

// TODO: Test dfw.Jdbc without running a local db.

dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");

dfw.InsertInto("TestTable");

dfw.Option("path", $"{tempDir.Path}TestSavePath1").Save();
dfw.Save($"{tempDir.Path}TestSavePath2");
dfw.Save($"{tempDir.Path}TestSavePath1");

dfw.Json($"{tempDir.Path}TestJsonPath");

Expand All @@ -85,6 +80,16 @@ public void TestSignaturesV2_3_X()
dfw.Text($"{tempDir.Path}TestTextPath");

dfw.Csv($"{tempDir.Path}TestCsvPath");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting path option and then calling save is not supported by default unless spark.sql.legacy.pathOptionBehavior.enabled conf is set. Reverse order in test.

https://github.com/apache/spark/blob/bb9abb098a1e26c220546616d655380c479b0e42/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L289

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. i would add this to the code.

dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");

dfw.InsertInto("TestTable");

// In Spark 3.1.1+ setting the `path` Option and then calling .Save(path) is not
// supported unless `spark.sql.legacy.pathOptionBehavior.enabled` conf is set.
// .Json(path), .Parquet(path), etc follow the same code path so the conf
// needs to be set in these scenarios as well.
dfw.Option("path", $"{tempDir.Path}TestSavePath2").Save();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public void TestSignaturesV2_3_X()
}));

string jsonFilePath = Path.Combine(TestEnvironment.ResourceDirectory, "people.json");
Assert.IsType<DataFrame>(dsr.Format("json").Option("path", jsonFilePath).Load());
Assert.IsType<DataFrame>(dsr.Format("json").Load(jsonFilePath));
Assert.IsType<DataFrame>(dsr.Json(jsonFilePath));
Assert.IsType<DataFrame>(
Expand All @@ -63,6 +62,12 @@ public void TestSignaturesV2_3_X()
dsr.Parquet(Path.Combine(TestEnvironment.ResourceDirectory, "users.parquet")));
Assert.IsType<DataFrame>
(dsr.Text(Path.Combine(TestEnvironment.ResourceDirectory, "people.txt")));

// In Spark 3.1.1+ setting the `path` Option and then calling .Load(path) is not
// supported unless `spark.sql.legacy.pathOptionBehavior.enabled` conf is set.
// .Json(path), .Parquet(path), etc follow the same code path so the conf
// needs to be set in these scenarios as well.
Assert.IsType<DataFrame>(dsr.Format("json").Option("path", jsonFilePath).Load());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting path option and then calling load is not supported by default unless spark.sql.legacy.pathOptionBehavior.enabled conf is set. Reverse order in test.

https://github.com/apache/spark/blob/bb9abb098a1e26c220546616d655380c479b0e42/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L259

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static SqlCommand[] ReadSqlCommands(
{
(2, 3) => SqlCommandProcessorV2_3_X.Process(evalType, stream),
(2, 4) => SqlCommandProcessorV2_4_X.Process(evalType, stream),
(3, 0) => SqlCommandProcessorV2_4_X.Process(evalType, stream),
(3, _) => SqlCommandProcessorV2_4_X.Process(evalType, stream),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal TaskContext Process(Stream stream)
{
(2, 3) => TaskContextProcessorV2_3_X.Process(stream),
(2, 4) => TaskContextProcessorV2_4_X.Process(stream),
(3, 0) => TaskContextProcessorV3_0_X.Process(stream),
(3, _) => TaskContextProcessorV3_0_X.Process(stream),
_ => throw new NotSupportedException($"Spark {_version} not supported.")
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private JvmObjectReference CreateBroadcast(SparkContext sc, T value)
CreateBroadcast_V2_3_1_AndBelow(javaSparkContext, value),
(2, 3) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(2, 4) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(3, 0) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(3, _) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ private IEnumerable<Row> GetRows(string funcName, params object[] args)
// string to use for the authentication.
(2, 3, _) => ParseConnectionInfo(result, false),
(2, 4, _) => ParseConnectionInfo(result, false),
(3, 0, _) => ParseConnectionInfo(result, false),
(3, _, _) => ParseConnectionInfo(result, false),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark/Versions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ internal static class Versions
internal const string V2_4_0 = "2.4.0";
internal const string V2_4_2 = "2.4.2";
internal const string V3_0_0 = "3.0.0";
internal const string V3_1_1 = "3.1.1";
}
}
2 changes: 1 addition & 1 deletion src/scala/microsoft-spark-3-0/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
</properties>
Expand Down
1 change: 1 addition & 0 deletions src/scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<module>microsoft-spark-2-3</module>
<module>microsoft-spark-2-4</module>
<module>microsoft-spark-3-0</module>
<module>microsoft-spark-3-1</module>
</modules>

<pluginRepositories>
Expand Down