-
Notifications
You must be signed in to change notification settings - Fork 327
Exposing DataFrame.Persist(StorageLevel) and DataFrame.StorageLevel APIs #623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
suhsteve
merged 51 commits into
dotnet:master
from
Niharikadutta:dataFrame2.4Completeness
Aug 26, 2020
Merged
Changes from all commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
03b7939
Adding section for UDF serialization
Niharikadutta 4ef693d
removing guides from master
Niharikadutta 81145ca
Merge latest from master
Niharikadutta e4b81af
merging latest from master
Niharikadutta 6bab996
CountVectorizer
e2a566b
moving private methods to bottom
5f682a6
changing wrap method
31371db
setting min version required
60eb82f
undoing csproj change
ed36375
member doesnt need to be internal
c7baf72
too many lines
d13303c
removing whitespace change
f5b477c
removing whitespace change
73db52b
ionide
4c5d502
Merge remote-tracking branch 'upstream/master'
Niharikadutta a766146
Merge branch 'master' into ml/countvectorizer
GoEddie 781ca4b
Adding Storagelevel class
Niharikadutta 797b8cb
Adding following missing APIs: Persist(), StorageLevel(), Pivot()
Niharikadutta 50de14d
Exposing APIs in StorageLevel
Niharikadutta 7dbf9b4
Cleanup
Niharikadutta ad6bced
Merge branch 'ml/countvectorizer' of https://github.com/GoEddie/spark
Niharikadutta 38f0b19
Removing Pivot
Niharikadutta 0fb4034
merging latest
Niharikadutta 44478d7
Update src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Niharikadutta 1529d70
Update src/csharp/Microsoft.Spark/Sql/StorageLevel.cs
Niharikadutta d905e9c
PR review comments
Niharikadutta b471973
Merge branch 'dataFrame2.4Completeness' of github.com:Niharikadutta/s…
Niharikadutta 8e1685c
Revert "Merge branch 'master' into ml/countvectorizer"
Niharikadutta 255515e
Revert "Merge branch 'ml/countvectorizer' of https://github.com/GoEdd…
Niharikadutta a44c882
Merge remote-tracking branch 'upstream/master'
Niharikadutta cbd473e
Merge branch 'master' into dataFrame2.4Completeness
Niharikadutta 3a7ed60
PR comments
Niharikadutta 436d1d4
PR review changes
Niharikadutta 3c2c936
fixing merge errors
Niharikadutta 4839ca5
Merge branch 'master' into dataFrame2.4Completeness
Niharikadutta d0d26d4
newline at end
Niharikadutta 9578746
Changing def to serialized Aliases as Pyspark
Niharikadutta 09d2c46
Exposing all static APIs
Niharikadutta 28e7cb1
PR review
Niharikadutta a655309
PR review comments
Niharikadutta a3782e5
PR comments
Niharikadutta 8ca43ab
PR review, added StorageLevelTests
Niharikadutta 7d2ce33
comment error
Niharikadutta 3b98dd3
Cleaning up comments
Niharikadutta 7c2be4d
nit whitespace
Niharikadutta 133363b
PR review comments
Niharikadutta 1c95768
PR review comments
Niharikadutta 3e256a8
PR review changes
Niharikadutta 87caa46
PR review changes
Niharikadutta 1ae27b5
Update src/csharp/Microsoft.Spark/Sql/StorageLevel.cs
Niharikadutta 644abc4
Merge branch 'master' into dataFrame2.4Completeness
imback82 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
src/csharp/Microsoft.Spark.E2ETest/IpcTests/StorageLevelTests.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using Microsoft.Spark.Sql; | ||
using Xunit; | ||
|
||
namespace Microsoft.Spark.E2ETest.IpcTests | ||
{ | ||
[Collection("Spark E2E Tests")] | ||
public class StorageLevelTests | ||
{ | ||
private readonly SparkSession _spark; | ||
private readonly DataFrame _df; | ||
|
||
public StorageLevelTests(SparkFixture fixture) | ||
{ | ||
_spark = fixture.Spark; | ||
_df = _spark.CreateDataFrame(new[] { "hello", "world" }); | ||
} | ||
|
||
/// <summary> | ||
/// Testing all public properties and methods of StorageLevel objects. | ||
/// </summary> | ||
[Fact] | ||
public void TestStorageLevelProperties() | ||
{ | ||
var storageLevels = new List<StorageLevel> { | ||
StorageLevel.NONE, | ||
StorageLevel.DISK_ONLY, | ||
StorageLevel.DISK_ONLY_2, | ||
StorageLevel.MEMORY_ONLY, | ||
StorageLevel.MEMORY_ONLY_2, | ||
StorageLevel.MEMORY_ONLY_SER, | ||
StorageLevel.MEMORY_ONLY_SER_2, | ||
StorageLevel.MEMORY_AND_DISK, | ||
StorageLevel.MEMORY_AND_DISK_2, | ||
StorageLevel.MEMORY_AND_DISK_SER, | ||
StorageLevel.MEMORY_AND_DISK_SER_2, | ||
StorageLevel.OFF_HEAP | ||
}; | ||
foreach (StorageLevel expected in storageLevels) | ||
{ | ||
_df.Persist(expected); | ||
StorageLevel actual = _df.StorageLevel(); | ||
Assert.Equal(expected, actual); | ||
// Needs to be unpersisted so other Persists can take effect. | ||
_df.Unpersist(); | ||
} | ||
|
||
StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK; | ||
Assert.True(storageLevel.UseDisk); | ||
Assert.True(storageLevel.UseMemory); | ||
Assert.False(storageLevel.UseOffHeap); | ||
Assert.True(storageLevel.Deserialized); | ||
Assert.Equal(1, storageLevel.Replication); | ||
|
||
Assert.IsType<string>(storageLevel.Description()); | ||
Assert.IsType<string>(storageLevel.ToString()); | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
// Licensed to the .NET Foundation under one or more agreements. | ||
// The .NET Foundation licenses this file to you under the MIT license. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System; | ||
using Microsoft.Spark.Interop; | ||
using Microsoft.Spark.Interop.Ipc; | ||
|
||
namespace Microsoft.Spark.Sql | ||
{ | ||
/// <summary> | ||
/// Flags for controlling the storage of an RDD. Each StorageLevel records whether to use | ||
/// memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the | ||
/// data in memory in a JAVA-specific serialized format, and whether to replicate the RDD | ||
/// partitions on multiple nodes. Also contains static properties for some commonly used | ||
/// storage levels, MEMORY_ONLY. | ||
/// </summary> | ||
public sealed class StorageLevel : IJvmObjectReferenceProvider | ||
{ | ||
private static readonly string s_storageLevelClassName = | ||
"org.apache.spark.storage.StorageLevel"; | ||
private static StorageLevel s_none; | ||
private static StorageLevel s_diskOnly; | ||
private static StorageLevel s_diskOnly2; | ||
private static StorageLevel s_memoryOnly; | ||
private static StorageLevel s_memoryOnly2; | ||
private static StorageLevel s_memoryOnlySer; | ||
private static StorageLevel s_memoryOnlySer2; | ||
private static StorageLevel s_memoryAndDisk; | ||
private static StorageLevel s_memoryAndDisk2; | ||
private static StorageLevel s_memoryAndDiskSer; | ||
private static StorageLevel s_memoryAndDiskSer2; | ||
private static StorageLevel s_offHeap; | ||
Niharikadutta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private readonly JvmObjectReference _jvmObject; | ||
private bool? _useDisk; | ||
private bool? _useMemory; | ||
private bool? _useOffHeap; | ||
private bool? _deserialized; | ||
private int? _replication; | ||
Niharikadutta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
internal StorageLevel(JvmObjectReference jvmObject) | ||
{ | ||
_jvmObject = jvmObject; | ||
} | ||
|
||
public StorageLevel( | ||
Niharikadutta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
bool useDisk, | ||
bool useMemory, | ||
bool useOffHeap, | ||
bool deserialized, | ||
int replication = 1) | ||
: this( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"apply", | ||
useDisk, | ||
useMemory, | ||
useOffHeap, | ||
deserialized, | ||
replication)) | ||
{ | ||
_useDisk = useDisk; | ||
_useMemory = useMemory; | ||
_useOffHeap = useOffHeap; | ||
_deserialized = deserialized; | ||
_replication = replication; | ||
} | ||
|
||
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject; | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel object with all parameters set to false. | ||
/// </summary> | ||
public static StorageLevel NONE => | ||
s_none ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"NONE")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk, serialized and replicated once. | ||
/// </summary> | ||
public static StorageLevel DISK_ONLY => | ||
s_diskOnly ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"DISK_ONLY")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk, serialized and replicated twice. | ||
/// </summary> | ||
public static StorageLevel DISK_ONLY_2 => | ||
s_diskOnly2 ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"DISK_ONLY_2")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Memory, deserialized and replicated once. | ||
/// </summary> | ||
public static StorageLevel MEMORY_ONLY => | ||
s_memoryOnly ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_ONLY")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Memory, deserialized and replicated twice. | ||
/// </summary> | ||
public static StorageLevel MEMORY_ONLY_2 => | ||
s_memoryOnly2 ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_ONLY_2")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Memory, serialized and replicated once. | ||
/// </summary> | ||
public static StorageLevel MEMORY_ONLY_SER => | ||
s_memoryOnlySer ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_ONLY_SER")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Memory, serialized and replicated twice. | ||
/// </summary> | ||
public static StorageLevel MEMORY_ONLY_SER_2 => | ||
s_memoryOnlySer2 ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_ONLY_SER_2")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk and Memory, deserialized and replicated once. | ||
/// </summary> | ||
public static StorageLevel MEMORY_AND_DISK => | ||
s_memoryAndDisk ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_AND_DISK")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk and Memory, deserialized and replicated twice. | ||
/// </summary> | ||
public static StorageLevel MEMORY_AND_DISK_2 => | ||
s_memoryAndDisk2 ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_AND_DISK_2")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk and Memory, serialized and replicated once. | ||
/// </summary> | ||
public static StorageLevel MEMORY_AND_DISK_SER => | ||
s_memoryAndDiskSer ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_AND_DISK_SER")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk and Memory, serialized and replicated twice. | ||
/// </summary> | ||
public static StorageLevel MEMORY_AND_DISK_SER_2 => | ||
s_memoryAndDiskSer2 ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"MEMORY_AND_DISK_SER_2")); | ||
|
||
/// <summary> | ||
/// Returns the StorageLevel to Disk, Memory and Offheap, serialized and replicated once. | ||
/// </summary> | ||
public static StorageLevel OFF_HEAP => | ||
s_offHeap ??= new StorageLevel( | ||
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod( | ||
s_storageLevelClassName, | ||
"OFF_HEAP")); | ||
|
||
/// <summary> | ||
/// Returns bool value of UseDisk of this StorageLevel. | ||
/// </summary> | ||
public bool UseDisk => _useDisk ??= (bool)_jvmObject.Invoke("useDisk"); | ||
|
||
/// <summary> | ||
/// Returns bool value of UseMemory of this StorageLevel. | ||
/// </summary> | ||
public bool UseMemory => _useMemory ??= (bool)_jvmObject.Invoke("useMemory"); | ||
|
||
/// <summary> | ||
/// Returns bool value of UseOffHeap of this StorageLevel. | ||
/// </summary> | ||
public bool UseOffHeap => _useOffHeap ??= (bool)_jvmObject.Invoke("useOffHeap"); | ||
|
||
/// <summary> | ||
/// Returns bool value of Deserialized of this StorageLevel. | ||
/// </summary> | ||
public bool Deserialized => _deserialized ??= (bool)_jvmObject.Invoke("deserialized"); | ||
|
||
/// <summary> | ||
/// Returns int value of Replication of this StorageLevel. | ||
/// </summary> | ||
public int Replication => _replication ??= (int)_jvmObject.Invoke("replication"); | ||
|
||
/// <summary> | ||
/// Returns the description string of this StorageLevel. | ||
/// </summary> | ||
/// <returns>Description as string.</returns> | ||
public string Description() => (string)_jvmObject.Invoke("description"); | ||
|
||
/// <summary> | ||
/// Returns the string representation of this StorageLevel. | ||
/// </summary> | ||
/// <returns>representation as string value.</returns> | ||
public override string ToString() => (string)_jvmObject.Invoke("toString"); | ||
|
||
/// <summary> | ||
/// Checks if the given object is same as the current object. | ||
/// </summary> | ||
/// <param name="obj">Other object to compare against</param> | ||
/// <returns>True if the other object is equal.</returns> | ||
public override bool Equals(object obj) | ||
{ | ||
if (!(obj is StorageLevel that)) | ||
{ | ||
return false; | ||
} | ||
|
||
return (UseDisk == that.UseDisk) && (UseMemory == that.UseMemory) && | ||
(UseOffHeap == that.UseOffHeap) && (Deserialized == that.Deserialized) && | ||
(Replication == that.Replication); | ||
} | ||
|
||
Niharikadutta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// <summary> | ||
/// Returns the hash code of the current object. | ||
/// </summary> | ||
/// <returns>The hash code of the current object</returns> | ||
public override int GetHashCode() => base.GetHashCode(); | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.