Skip to content

Commit 491de17

Browse files
authored
Expose Hadoop Configuration/FileSystem (#787)
1 parent d2822c3 commit 491de17

File tree

8 files changed

+150
-3
lines changed

8 files changed

+150
-3
lines changed

src/csharp/Extensions/Microsoft.Spark.Extensions.Delta.E2ETest/Microsoft.Spark.Extensions.Delta.E2ETest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
<PropertyGroup>
44
<TargetFramework>netcoreapp3.1</TargetFramework>
5-
<IsPackable>false</IsPackable>
65
</PropertyGroup>
76

87
<ItemGroup>

src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
<PropertyGroup>
44
<TargetFramework>netcoreapp3.1</TargetFramework>
55
<RootNamespace>Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest</RootNamespace>
6-
<IsPackable>false</IsPackable>
76
</PropertyGroup>
87

98
<ItemGroup>

src/csharp/Extensions/Microsoft.Spark.Extensions.Hyperspace.E2ETest/Microsoft.Spark.Extensions.Hyperspace.E2ETest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
<PropertyGroup>
44
<TargetFramework>netcoreapp3.1</TargetFramework>
5-
<IsPackable>false</IsPackable>
65
</PropertyGroup>
76

87
<ItemGroup>
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.IO;
6+
using Microsoft.Spark.Hadoop.Fs;
7+
using Microsoft.Spark.Sql;
8+
using Microsoft.Spark.UnitTest.TestUtils;
9+
using Xunit;
10+
11+
namespace Microsoft.Spark.E2ETest.Hadoop
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class FileSystemTests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public FileSystemTests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test that methods return the expected signature.
25+
/// </summary>
26+
[Fact]
27+
public void TestSignatures()
28+
{
29+
using var tempDirectory = new TemporaryDirectory();
30+
31+
using FileSystem fs = FileSystem.Get(_spark.SparkContext.HadoopConfiguration());
32+
33+
Assert.IsType<bool>(fs.Delete(tempDirectory.Path, true));
34+
}
35+
36+
/// <summary>
37+
/// Test that Delete() deletes the file.
38+
/// </summary>
39+
[Fact]
40+
public void TestDelete()
41+
{
42+
using FileSystem fs = FileSystem.Get(_spark.SparkContext.HadoopConfiguration());
43+
44+
using var tempDirectory = new TemporaryDirectory();
45+
string path = Path.Combine(tempDirectory.Path, "temp-table");
46+
_spark.Range(25).Write().Format("parquet").Save(path);
47+
48+
Assert.True(Directory.Exists(path));
49+
50+
Assert.True(fs.Delete(path, true));
51+
Assert.False(fs.Delete(path, true));
52+
53+
Assert.False(Directory.Exists(path));
54+
}
55+
}
56+
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See the LICENSE file in the project root for more information.
44

55
using System;
6+
using Microsoft.Spark.Hadoop.Conf;
67
using Microsoft.Spark.UnitTest.TestUtils;
78
using Xunit;
89

@@ -42,6 +43,8 @@ public void TestSignaturesV2_3_X()
4243

4344
using var tempDir = new TemporaryDirectory();
4445
sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);
46+
47+
Assert.IsType<Configuration>(sc.HadoopConfiguration());
4548
}
4649
}
4750
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using Microsoft.Spark.Interop.Ipc;
6+
7+
namespace Microsoft.Spark.Hadoop.Conf
8+
{
9+
/// <summary>
10+
/// Provides access to configuration parameters.
11+
/// </summary>
12+
public class Configuration : IJvmObjectReferenceProvider
13+
{
14+
private readonly JvmObjectReference _jvmObject;
15+
16+
internal Configuration(JvmObjectReference jvmObject)
17+
{
18+
_jvmObject = jvmObject;
19+
}
20+
21+
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
22+
}
23+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using Microsoft.Spark.Hadoop.Conf;
7+
using Microsoft.Spark.Interop;
8+
using Microsoft.Spark.Interop.Ipc;
9+
10+
namespace Microsoft.Spark.Hadoop.Fs
11+
{
12+
/// <summary>
13+
/// A fairly generic filesystem. It may be implemented as a distributed filesystem, or as a "local" one
14+
/// that reflects the locally-connected disk. The local version exists for small Hadoop instances and for
15+
/// testing.
16+
///
17+
/// All user code that may potentially use the Hadoop Distributed File System should be written to use a
18+
/// FileSystem object. The Hadoop DFS is a multi-machine system that appears as a single disk. It's
19+
/// useful because of its fault tolerance and potentially very large capacity.
20+
/// </summary>
21+
public class FileSystem : IJvmObjectReferenceProvider, IDisposable
22+
{
23+
private readonly JvmObjectReference _jvmObject;
24+
25+
internal FileSystem(JvmObjectReference jvmObject)
26+
{
27+
_jvmObject = jvmObject;
28+
}
29+
30+
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
31+
32+
/// <summary>
33+
/// Returns the configured <see cref="FileSystem"/>.
34+
/// </summary>
35+
/// <param name="conf">The configuration to use.</param>
36+
/// <returns>The FileSystem.</returns>
37+
public static FileSystem Get(Configuration conf) =>
38+
new FileSystem((JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
39+
"org.apache.hadoop.fs.FileSystem",
40+
"get",
41+
conf));
42+
43+
/// <summary>
44+
/// Delete a file.
45+
/// </summary>
46+
/// <param name="path">The path to delete.</param>
47+
/// <param name="recursive">If path is a directory and set to true, the directory is deleted else
48+
/// throws an exception. In case of a file the recursive can be set to either true or false.</param>
49+
/// <returns>True if delete is successful else false.</returns>
50+
public bool Delete(string path, bool recursive = true)
51+
{
52+
JvmObjectReference pathObject =
53+
SparkEnvironment.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", path);
54+
55+
return (bool)_jvmObject.Invoke("delete", pathObject, recursive);
56+
}
57+
58+
public void Dispose() => _jvmObject.Invoke("close");
59+
}
60+
}

src/csharp/Microsoft.Spark/SparkContext.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Collections.Generic;
66
using System.IO;
77
using System.Runtime.Serialization.Formatters.Binary;
8+
using Microsoft.Spark.Hadoop.Conf;
89
using Microsoft.Spark.Interop.Ipc;
910
using static Microsoft.Spark.Utils.CommandSerDe;
1011

@@ -312,6 +313,13 @@ public Broadcast<T> Broadcast<T>(T value)
312313
return new Broadcast<T>(this, value);
313314
}
314315

316+
/// <summary>
317+
/// A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
318+
/// </summary>
319+
/// <returns>The Hadoop Configuration.</returns>
320+
public Configuration HadoopConfiguration() =>
321+
new Configuration((JvmObjectReference)_jvmObject.Invoke("hadoopConfiguration"));
322+
315323
/// <summary>
316324
/// Returns JVM object reference to JavaRDD object transformed
317325
/// from a Scala RDD object.

0 commit comments

Comments
 (0)