Skip to content

Fix for using Broadcast variables in Databricks #766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 58 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
03b7939
Adding section for UDF serialization
Niharikadutta Apr 20, 2020
4ef693d
removing guides from master
Niharikadutta Apr 20, 2020
81145ca
Merge latest from master
Niharikadutta May 6, 2020
e4b81af
merging latest from master
Niharikadutta May 7, 2020
4c32173
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 2, 2020
4987a09
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 14, 2020
ca9612e
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 16, 2020
f581c86
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 20, 2020
086b325
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 23, 2020
2f72907
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jul 25, 2020
6bab996
CountVectorizer
Jul 27, 2020
e2a566b
moving private methods to bottom
Jul 27, 2020
5f682a6
changing wrap method
Jul 28, 2020
31371db
setting min version required
Jul 31, 2020
60eb82f
undoing csproj change
Jul 31, 2020
ed36375
member doesnt need to be internal
Jul 31, 2020
c7baf72
too many lines
Jul 31, 2020
d13303c
removing whitespace change
Jul 31, 2020
f5b477c
removing whitespace change
Jul 31, 2020
73db52b
ionide
Jul 31, 2020
98f5e4d
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 7, 2020
4c5d502
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 10, 2020
a766146
Merge branch 'master' into ml/countvectorizer
GoEddie Aug 12, 2020
ad6bced
Merge branch 'ml/countvectorizer' of https://github.com/GoEddie/spark
Niharikadutta Aug 13, 2020
8e1685c
Revert "Merge branch 'master' into ml/countvectorizer"
Niharikadutta Aug 13, 2020
255515e
Revert "Merge branch 'ml/countvectorizer' of https://github.com/GoEdd…
Niharikadutta Aug 13, 2020
a44c882
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 14, 2020
3c2c936
fixing merge errors
Niharikadutta Aug 14, 2020
88e834d
removing ionid
Niharikadutta Aug 20, 2020
59e7299
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 20, 2020
a13de2d
Merge branch 'master' of github.com:Niharikadutta/spark
Niharikadutta Aug 21, 2020
13d0e4a
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 24, 2020
595b141
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 29, 2020
decfa48
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 2, 2020
ce694ff
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 8, 2020
8128ba0
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 12, 2020
52f0a74
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 19, 2020
6a89f01
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 24, 2020
4b1de41
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 25, 2020
929d8e2
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 26, 2020
ffa0a4d
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 2, 2020
2579faa
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 5, 2020
2297add
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 6, 2020
daade7a
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 8, 2020
cb6aa7a
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 12, 2020
cbe6e50
Merge branch 'master' of github.com:Niharikadutta/spark
Niharikadutta Oct 12, 2020
3a04b19
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 12, 2020
2c498dc
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 13, 2020
d19cfb6
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 16, 2020
d34188e
Merge branch 'master' of github.com:Niharikadutta/spark
Niharikadutta Oct 16, 2020
5457ffb
Merge remote-tracking branch 'upstream/master'
Niharikadutta Oct 26, 2020
2a44453
Merge remote-tracking branch 'upstream/master'
Niharikadutta Nov 4, 2020
ebfd45a
Fix for calling Broadcast in Databricks
Niharikadutta Nov 4, 2020
0e09326
Merge branch 'master' into nidutta/databricks_broadcast_fix
Niharikadutta Nov 6, 2020
c01b36f
moving IsDatabricks util to ConfigurationService
Niharikadutta Nov 6, 2020
ba6bc7a
Merge branch 'nidutta/databricks_broadcast_fix' of github.com:Niharik…
Niharikadutta Nov 6, 2020
105478b
PR comments
Niharikadutta Nov 6, 2020
d414755
Update src/csharp/Microsoft.Spark/Broadcast.cs
Niharikadutta Nov 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark.Worker/DaemonWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private void StartServer(ISocketWrapper listener)
SerDe.Write(socket.OutputStream, taskRunnerId);
socket.OutputStream.Flush();

if (Utils.SettingUtils.IsDatabricks)
if (ConfigurationService.IsDatabricks)
{
SerDe.ReadString(socket.InputStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.IO;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Services;
using Microsoft.Spark.Worker.Utils;

namespace Microsoft.Spark.Worker.Processor
Expand Down Expand Up @@ -68,7 +69,7 @@ internal Payload Process(Stream stream)
// our current context.
AssemblyLoaderHelper.RegisterAssemblyHandler();

if (SettingUtils.IsDatabricks)
if (ConfigurationService.IsDatabricks)
{
SerDe.ReadString(stream);
SerDe.ReadString(stream);
Expand Down
3 changes: 0 additions & 3 deletions src/csharp/Microsoft.Spark.Worker/Utils/SettingUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ namespace Microsoft.Spark.Worker.Utils
/// </summary>
internal static class SettingUtils
{
internal static bool IsDatabricks { get; } =
!string.IsNullOrEmpty(GetEnvironmentVariable("DATABRICKS_RUNTIME_VERSION"));

internal static string GetWorkerFactorySecret(Version version)
{
return (version >= new Version(Versions.V2_3_1)) ?
Expand Down
21 changes: 17 additions & 4 deletions src/csharp/Microsoft.Spark/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,24 @@ private JvmObjectReference CreateBroadcast_V2_3_2_AndAbove(
// Spark versions.
bool encryptionEnabled = bool.Parse(
sc.GetConf().Get("spark.io.encryption.enabled", "false"));
JvmObjectReference _pythonBroadcast;

var _pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"setupBroadcast",
_path);
// Databricks has changed the signature of setupBroadcast in its Spark
if (ConfigurationService.IsDatabricks)
{
_pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"setupBroadcast",
javaSparkContext,
_path);
}
else
{
_pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"setupBroadcast",
_path);
}

if (encryptionEnabled)
{
Expand Down
4 changes: 4 additions & 0 deletions src/csharp/Microsoft.Spark/Services/ConfigurationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.IO;
using System.Runtime.InteropServices;
using static System.Environment;
using Microsoft.Spark.Utils;

namespace Microsoft.Spark.Services
Expand Down Expand Up @@ -49,6 +50,9 @@ public TimeSpan JvmThreadGCInterval
}
}

internal static bool IsDatabricks { get; } =
!string.IsNullOrEmpty(GetEnvironmentVariable("DATABRICKS_RUNTIME_VERSION"));

/// <summary>
/// Returns the port number for socket communication between JVM and CLR.
/// </summary>
Expand Down