Skip to content

Exposing DataFrame.Persist(StorageLevel) and DataFrame.StorageLevel APIs #623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 51 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
03b7939
Adding section for UDF serialization
Niharikadutta Apr 20, 2020
4ef693d
removing guides from master
Niharikadutta Apr 20, 2020
81145ca
Merge latest from master
Niharikadutta May 6, 2020
e4b81af
merging latest from master
Niharikadutta May 7, 2020
6bab996
CountVectorizer
Jul 27, 2020
e2a566b
moving private methods to bottom
Jul 27, 2020
5f682a6
changing wrap method
Jul 28, 2020
31371db
setting min version required
Jul 31, 2020
60eb82f
undoing csproj change
Jul 31, 2020
ed36375
member doesnt need to be internal
Jul 31, 2020
c7baf72
too many lines
Jul 31, 2020
d13303c
removing whitespace change
Jul 31, 2020
f5b477c
removing whitespace change
Jul 31, 2020
73db52b
ionide
Jul 31, 2020
4c5d502
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 10, 2020
a766146
Merge branch 'master' into ml/countvectorizer
GoEddie Aug 12, 2020
781ca4b
Adding Storagelevel class
Niharikadutta Aug 12, 2020
797b8cb
Adding following missing APIs: Persist(), StorageLevel(), Pivot()
Niharikadutta Aug 12, 2020
50de14d
Exposing APIs in StorageLevel
Niharikadutta Aug 13, 2020
7dbf9b4
Cleanup
Niharikadutta Aug 13, 2020
ad6bced
Merge branch 'ml/countvectorizer' of https://github.com/GoEddie/spark
Niharikadutta Aug 13, 2020
38f0b19
Removing Pivot
Niharikadutta Aug 13, 2020
0fb4034
merging latest
Niharikadutta Aug 13, 2020
44478d7
Update src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Niharikadutta Aug 13, 2020
1529d70
Update src/csharp/Microsoft.Spark/Sql/StorageLevel.cs
Niharikadutta Aug 13, 2020
d905e9c
PR review comments
Niharikadutta Aug 13, 2020
b471973
Merge branch 'dataFrame2.4Completeness' of github.com:Niharikadutta/s…
Niharikadutta Aug 13, 2020
8e1685c
Revert "Merge branch 'master' into ml/countvectorizer"
Niharikadutta Aug 13, 2020
255515e
Revert "Merge branch 'ml/countvectorizer' of https://github.com/GoEdd…
Niharikadutta Aug 13, 2020
a44c882
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 14, 2020
cbd473e
Merge branch 'master' into dataFrame2.4Completeness
Niharikadutta Aug 14, 2020
3a7ed60
PR comments
Niharikadutta Aug 14, 2020
436d1d4
PR review changes
Niharikadutta Aug 14, 2020
3c2c936
fixing merge errors
Niharikadutta Aug 14, 2020
4839ca5
Merge branch 'master' into dataFrame2.4Completeness
Niharikadutta Aug 14, 2020
d0d26d4
newline at end
Niharikadutta Aug 14, 2020
9578746
Changing def to serialized Aliases as Pyspark
Niharikadutta Aug 14, 2020
09d2c46
Exposing all static APIs
Niharikadutta Aug 15, 2020
28e7cb1
PR review
Niharikadutta Aug 16, 2020
a655309
PR review comments
Niharikadutta Aug 21, 2020
a3782e5
PR comments
Niharikadutta Aug 21, 2020
8ca43ab
PR review, added StorageLevelTests
Niharikadutta Aug 21, 2020
7d2ce33
comment error
Niharikadutta Aug 21, 2020
3b98dd3
Cleaning up comments
Niharikadutta Aug 21, 2020
7c2be4d
nit whitespace
Niharikadutta Aug 21, 2020
133363b
PR review comments
Niharikadutta Aug 23, 2020
1c95768
PR review comments
Niharikadutta Aug 24, 2020
3e256a8
PR review changes
Niharikadutta Aug 25, 2020
87caa46
PR review changes
Niharikadutta Aug 26, 2020
1ae27b5
Update src/csharp/Microsoft.Spark/Sql/StorageLevel.cs
Niharikadutta Aug 26, 2020
644abc4
Merge branch 'master' into dataFrame2.4Completeness
imback82 Aug 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,12 @@ public void TestSignaturesV2_3_X()

Assert.IsType<DataFrame>(_df.Persist());

Assert.IsType<DataFrame>(_df.Persist(StorageLevel.DISK_ONLY));

Assert.IsType<DataFrame>(_df.Cache());

Assert.IsType<StorageLevel>(_df.StorageLevel());

Assert.IsType<DataFrame>(_df.Unpersist());

_df.CreateTempView("view");
Expand Down
60 changes: 60 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/StorageLevelTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class StorageLevelTests
{
private readonly SparkSession _spark;
private readonly DataFrame _df;

public StorageLevelTests(SparkFixture fixture)
{
_spark = fixture.Spark;
_df = _spark.CreateDataFrame(new[] { "hello", "world" });
}

/// <summary>
/// Testing all public properties and methods of StorageLevel objects.
/// </summary>
[Fact]
public void TestStorageLevelProperties()
{
var storageLevels = new List<StorageLevel> {
StorageLevel.NONE,
StorageLevel.DISK_ONLY,
StorageLevel.DISK_ONLY_2,
StorageLevel.MEMORY_ONLY,
StorageLevel.MEMORY_ONLY_2,
StorageLevel.MEMORY_ONLY_SER,
StorageLevel.MEMORY_ONLY_SER_2,
StorageLevel.MEMORY_AND_DISK,
StorageLevel.MEMORY_AND_DISK_2,
StorageLevel.MEMORY_AND_DISK_SER,
StorageLevel.MEMORY_AND_DISK_SER_2,
StorageLevel.OFF_HEAP
};
foreach (StorageLevel expected in storageLevels)
{
_df.Persist(expected);
StorageLevel actual = _df.StorageLevel();
Assert.Equal(expected, actual);
// Needs to be unpersisted so other Persists can take effect.
_df.Unpersist();
}

StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK;
Assert.True(storageLevel.UseDisk);
Assert.True(storageLevel.UseMemory);
Assert.False(storageLevel.UseOffHeap);
Assert.True(storageLevel.Deserialized);
Assert.Equal(1, storageLevel.Replication);

Assert.IsType<string>(storageLevel.Description());
Assert.IsType<string>(storageLevel.ToString());
}
}
}
25 changes: 19 additions & 6 deletions src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

namespace Microsoft.Spark.Sql
{
/// TODO:
/// Missing APIs:
/// Persist() with "StorageLevel"

/// <summary>
/// A distributed collection of data organized into named columns.
/// </summary>
Expand Down Expand Up @@ -808,17 +804,34 @@ public DataFrame Coalesce(int numPartitions) =>
public DataFrame Distinct() => WrapAsDataFrame(_jvmObject.Invoke("distinct"));

/// <summary>
/// Persist this `DataFrame` with the default storage level (`MEMORY_AND_DISK`).
/// Persist this <see cref="DataFrame"/> with the default storage level MEMORY_AND_DISK.
/// </summary>
/// <returns>DataFrame object</returns>
public DataFrame Persist() => WrapAsDataFrame(_jvmObject.Invoke("persist"));

/// <summary>
/// Persist this `DataFrame` with the default storage level (`MEMORY_AND_DISK`).
/// Persist this <see cref="DataFrame"/> with the given storage level.
/// </summary>
/// <param name="storageLevel">
/// <see cref="StorageLevel"/> to persist the <see cref="DataFrame"/> to.
/// </param>
/// <returns>DataFrame object</returns>
public DataFrame Persist(StorageLevel storageLevel) =>
WrapAsDataFrame(_jvmObject.Invoke("persist", storageLevel));

/// <summary>
/// Persist this <see cref="DataFrame"/> with the default storage level MEMORY_AND_DISK.
/// </summary>
/// <returns>DataFrame object</returns>
public DataFrame Cache() => WrapAsDataFrame(_jvmObject.Invoke("cache"));

/// <summary>
/// Get the <see cref="DataFrame"/>'s current <see cref="StorageLevel"/>.
/// </summary>
/// <returns><see cref="StorageLevel"/> object</returns>
public StorageLevel StorageLevel() =>
new StorageLevel((JvmObjectReference)_jvmObject.Invoke("storageLevel"));

/// <summary>
/// Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk.
/// </summary>
Expand Down
239 changes: 239 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/StorageLevel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Sql
{
/// <summary>
/// Flags for controlling the storage of an RDD. Each StorageLevel records whether to use
/// memory, whether to drop the RDD to disk if it falls out of memory, whether to keep the
/// data in memory in a JAVA-specific serialized format, and whether to replicate the RDD
/// partitions on multiple nodes. Also contains static properties for some commonly used
/// storage levels, MEMORY_ONLY.
/// </summary>
public sealed class StorageLevel : IJvmObjectReferenceProvider
{
private static readonly string s_storageLevelClassName =
"org.apache.spark.storage.StorageLevel";
private static StorageLevel s_none;
private static StorageLevel s_diskOnly;
private static StorageLevel s_diskOnly2;
private static StorageLevel s_memoryOnly;
private static StorageLevel s_memoryOnly2;
private static StorageLevel s_memoryOnlySer;
private static StorageLevel s_memoryOnlySer2;
private static StorageLevel s_memoryAndDisk;
private static StorageLevel s_memoryAndDisk2;
private static StorageLevel s_memoryAndDiskSer;
private static StorageLevel s_memoryAndDiskSer2;
private static StorageLevel s_offHeap;
private readonly JvmObjectReference _jvmObject;
private bool? _useDisk;
private bool? _useMemory;
private bool? _useOffHeap;
private bool? _deserialized;
private int? _replication;

internal StorageLevel(JvmObjectReference jvmObject)
{
_jvmObject = jvmObject;
}

public StorageLevel(
bool useDisk,
bool useMemory,
bool useOffHeap,
bool deserialized,
int replication = 1)
: this(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"apply",
useDisk,
useMemory,
useOffHeap,
deserialized,
replication))
{
_useDisk = useDisk;
_useMemory = useMemory;
_useOffHeap = useOffHeap;
_deserialized = deserialized;
_replication = replication;
}

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// Returns the StorageLevel object with all parameters set to false.
/// </summary>
public static StorageLevel NONE =>
s_none ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"NONE"));

/// <summary>
/// Returns the StorageLevel to Disk, serialized and replicated once.
/// </summary>
public static StorageLevel DISK_ONLY =>
s_diskOnly ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"DISK_ONLY"));

/// <summary>
/// Returns the StorageLevel to Disk, serialized and replicated twice.
/// </summary>
public static StorageLevel DISK_ONLY_2 =>
s_diskOnly2 ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"DISK_ONLY_2"));

/// <summary>
/// Returns the StorageLevel to Memory, deserialized and replicated once.
/// </summary>
public static StorageLevel MEMORY_ONLY =>
s_memoryOnly ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_ONLY"));

/// <summary>
/// Returns the StorageLevel to Memory, deserialized and replicated twice.
/// </summary>
public static StorageLevel MEMORY_ONLY_2 =>
s_memoryOnly2 ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_ONLY_2"));

/// <summary>
/// Returns the StorageLevel to Memory, serialized and replicated once.
/// </summary>
public static StorageLevel MEMORY_ONLY_SER =>
s_memoryOnlySer ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_ONLY_SER"));

/// <summary>
/// Returns the StorageLevel to Memory, serialized and replicated twice.
/// </summary>
public static StorageLevel MEMORY_ONLY_SER_2 =>
s_memoryOnlySer2 ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_ONLY_SER_2"));

/// <summary>
/// Returns the StorageLevel to Disk and Memory, deserialized and replicated once.
/// </summary>
public static StorageLevel MEMORY_AND_DISK =>
s_memoryAndDisk ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_AND_DISK"));

/// <summary>
/// Returns the StorageLevel to Disk and Memory, deserialized and replicated twice.
/// </summary>
public static StorageLevel MEMORY_AND_DISK_2 =>
s_memoryAndDisk2 ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_AND_DISK_2"));

/// <summary>
/// Returns the StorageLevel to Disk and Memory, serialized and replicated once.
/// </summary>
public static StorageLevel MEMORY_AND_DISK_SER =>
s_memoryAndDiskSer ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_AND_DISK_SER"));

/// <summary>
/// Returns the StorageLevel to Disk and Memory, serialized and replicated twice.
/// </summary>
public static StorageLevel MEMORY_AND_DISK_SER_2 =>
s_memoryAndDiskSer2 ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"MEMORY_AND_DISK_SER_2"));

/// <summary>
/// Returns the StorageLevel to Disk, Memory and Offheap, serialized and replicated once.
/// </summary>
public static StorageLevel OFF_HEAP =>
s_offHeap ??= new StorageLevel(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_storageLevelClassName,
"OFF_HEAP"));

/// <summary>
/// Returns bool value of UseDisk of this StorageLevel.
/// </summary>
public bool UseDisk => _useDisk ??= (bool)_jvmObject.Invoke("useDisk");

/// <summary>
/// Returns bool value of UseMemory of this StorageLevel.
/// </summary>
public bool UseMemory => _useMemory ??= (bool)_jvmObject.Invoke("useMemory");

/// <summary>
/// Returns bool value of UseOffHeap of this StorageLevel.
/// </summary>
public bool UseOffHeap => _useOffHeap ??= (bool)_jvmObject.Invoke("useOffHeap");

/// <summary>
/// Returns bool value of Deserialized of this StorageLevel.
/// </summary>
public bool Deserialized => _deserialized ??= (bool)_jvmObject.Invoke("deserialized");

/// <summary>
/// Returns int value of Replication of this StorageLevel.
/// </summary>
public int Replication => _replication ??= (int)_jvmObject.Invoke("replication");

/// <summary>
/// Returns the description string of this StorageLevel.
/// </summary>
/// <returns>Description as string.</returns>
public string Description() => (string)_jvmObject.Invoke("description");

/// <summary>
/// Returns the string representation of this StorageLevel.
/// </summary>
/// <returns>representation as string value.</returns>
public override string ToString() => (string)_jvmObject.Invoke("toString");

/// <summary>
/// Checks if the given object is same as the current object.
/// </summary>
/// <param name="obj">Other object to compare against</param>
/// <returns>True if the other object is equal.</returns>
public override bool Equals(object obj)
{
if (!(obj is StorageLevel that))
{
return false;
}

return (UseDisk == that.UseDisk) && (UseMemory == that.UseMemory) &&
(UseOffHeap == that.UseOffHeap) && (Deserialized == that.Deserialized) &&
(Replication == that.Replication);
}

/// <summary>
/// Returns the hash code of the current object.
/// </summary>
/// <returns>The hash code of the current object</returns>
public override int GetHashCode() => base.GetHashCode();
}
}