Skip to content

Commit 4c49aa3

Browse files
committed
Added implementation for new openFile for tink enc
1 parent 0f7bab8 commit 4c49aa3

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
<google.tink.version>1.3.0-rc3</google.tink.version>
9090
<guava.version>27.0.1-jre</guava.version>
9191
<hadoop.version>3.3.6</hadoop.version>
92-
<hbase-shaded-client.version>1.4.13</hbase-shaded-client.version>
92+
<hbase-shaded-client.version>2.6.2-hadoop3</hbase-shaded-client.version>
9393
<hbase-shaded-server.version>1.4.13</hbase-shaded-server.version>
9494
<httpclient.version>4.5.13</httpclient.version>
9595
<jackson.core.version>2.13.4.2</jackson.core.version>

src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import org.apache.hadoop.fs.FSInputStream;
2323
import org.apache.hadoop.fs.FileSystem;
2424
import org.apache.hadoop.fs.FilterFileSystem;
25+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2526
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.fs.PathHandle;
28+
import org.apache.hadoop.fs.impl.OpenFileParameters;
29+
import org.jetbrains.annotations.NotNull;
2630
import org.slf4j.Logger;
2731
import org.slf4j.LoggerFactory;
2832

@@ -32,6 +36,8 @@
3236
import java.nio.channels.Channels;
3337
import java.nio.channels.SeekableByteChannel;
3438
import java.util.Map;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.CompletionException;
3541

3642
/**
3743
* A hadoop {@link FileSystem} that support files decryption (encryption is currently not supported).
@@ -42,6 +48,7 @@ public class EncryptedFileSystem extends FilterFileSystem {
4248
private static final String FS_SCHEME = CONF_PREFIX + "scheme";
4349
private static final String FS_IMPL = CONF_PREFIX + "impl";
4450
private static final String DECRYPTOR_IMPL = CONF_PREFIX + "decryptor.impl";
51+
private static final int DEFAULT_BUFFER_SIZE = 4096;
4552

4653
private static final Logger LOG = LoggerFactory.getLogger(EncryptedFileSystem.class);
4754

@@ -103,6 +110,82 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
103110
return new FSDataInputStream(new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize)));
104111
}
105112

113+
/**
114+
* Opens a file asynchronously and returns a {@link FutureDataInputStreamBuilder}
115+
* to build a {@link FSDataInputStream} for the specified {@link Path}.
116+
*
117+
* <p>This implementation returns a builder that constructs an input stream by using a decryptor
118+
* to open the file through a {@link SeekableByteChannelFSInputStream}. The file is read
119+
* with a buffer size of 4096 bytes.</p>
120+
*
121+
* @param path the {@link Path} of the file to open
122+
* @return a {@link FutureDataInputStreamBuilder} that asynchronously builds a {@link FSDataInputStream}
123+
* @throws UnsupportedOperationException if the operation is not supported
124+
*/
125+
@Override
126+
public FutureDataInputStreamBuilder openFile(Path path) throws UnsupportedOperationException {
127+
return new FutureDataInputStreamBuilder() {
128+
@Override
129+
public CompletableFuture<FSDataInputStream> build()
130+
throws IllegalArgumentException, UnsupportedOperationException {
131+
return CompletableFuture.supplyAsync(() -> {
132+
try {
133+
return new FSDataInputStream(
134+
new SeekableByteChannelFSInputStream(decryptor.open(fs, path, DEFAULT_BUFFER_SIZE)));
135+
} catch (Exception e) {
136+
throw new CompletionException(e);
137+
}
138+
});
139+
}
140+
141+
@Override
142+
public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String s1) {
143+
return this;
144+
}
145+
146+
@Override
147+
public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String... strings) {
148+
return this;
149+
}
150+
151+
@Override
152+
public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String s1) {
153+
return this;
154+
}
155+
156+
@Override
157+
public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String... strings) {
158+
return this;
159+
}
160+
};
161+
}
162+
163+
/**
164+
* Opens a file asynchronously using the provided {@link Path}, and returns
165+
* a {@link CompletableFuture} that supplies a {@link FSDataInputStream}.
166+
*
167+
* <p>This method uses a decryptor to open the file and wraps it in a {@link SeekableByteChannelFSInputStream}.
168+
* It uses the buffer size specified in the {@code parameters}; if the buffer size is not greater than zero,
169+
* a default of 4096 bytes is used.</p>
170+
*
171+
* @param path the {@link Path} to the file to open
172+
* @param parameters the {@link OpenFileParameters} containing optional configuration, such as buffer size
173+
* @return a {@link CompletableFuture} that will complete with the {@link FSDataInputStream}
174+
* @throws CompletionException if an exception occurs during file opening
175+
*/
176+
@Override
177+
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path, OpenFileParameters parameters) {
178+
return CompletableFuture.supplyAsync(() -> {
179+
try {
180+
int bufferSize = parameters.getBufferSize() > 0 ? parameters.getBufferSize() : 4096;
181+
return new FSDataInputStream(
182+
new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize)));
183+
} catch (Exception e) {
184+
throw new CompletionException(e);
185+
}
186+
});
187+
}
188+
106189
/**
107190
* A {@link FSInputStream} implementation backed by a {@link SeekableByteChannel}.
108191
*/

0 commit comments

Comments
 (0)