Skip to content

Commit 7d0d9db

Browse files
champ-isaacfhussonnois
authored andcommitted
feat(filesystems): keep subfolders path for AmazonS3MovePolicy
After processing files and moving to the success/failure folders, keep source prefix path for these processed files
1 parent 2b45914 commit 7d0d9db

File tree

2 files changed

+146
-5
lines changed

2 files changed

+146
-5
lines changed

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
*/
77
package io.streamthoughts.kafka.connect.filepulse.fs.clean;
88

9+
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG;
910
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_BUCKET_NAME_CONFIG;
11+
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH;
1012
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG;
1113
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_BUCKET_NAME_CONFIG;
14+
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH;
1215
import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG;
1316

1417
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
@@ -29,7 +32,10 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy {
2932
private static final Logger LOG = LoggerFactory.getLogger(AmazonS3MoveCleanupPolicy.class);
3033

3134
private AmazonS3Storage storage;
32-
35+
36+
private boolean includeSuccessSourcePrefixPath;
37+
private boolean includeFailuresSourcePrefixPath;
38+
3339
private Config config;
3440

3541
/**
@@ -38,27 +44,38 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy {
3844
@Override
3945
public void configure(final Map<String, ?> configs) {
4046
this.config = new Config(configs);
47+
this.includeSuccessSourcePrefixPath = this.config.getBoolean(SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH);
48+
this.includeFailuresSourcePrefixPath = this.config.getBoolean(FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH);
4149
}
4250

4351
/**
4452
* {@inheritDoc}
4553
*/
4654
@Override
4755
public boolean onSuccess(final FileObject source) {
48-
return move(source, SUCCESS_AWS_BUCKET_NAME_CONFIG, SUCCESS_AWS_PREFIX_PATH_CONFIG);
56+
return move(
57+
source,
58+
SUCCESS_AWS_BUCKET_NAME_CONFIG,
59+
SUCCESS_AWS_PREFIX_PATH_CONFIG,
60+
includeSuccessSourcePrefixPath);
4961
}
5062

5163
/**
5264
* {@inheritDoc}
5365
*/
5466
@Override
5567
public boolean onFailure(final FileObject source) {
56-
return move(source, FAILURES_AWS_BUCKET_NAME_CONFIG, FAILURES_AWS_PREFIX_PATH_CONFIG);
68+
return move(
69+
source,
70+
FAILURES_AWS_BUCKET_NAME_CONFIG,
71+
FAILURES_AWS_PREFIX_PATH_CONFIG,
72+
includeFailuresSourcePrefixPath);
5773
}
5874

5975
private boolean move(final FileObject source,
6076
final String destinationS3BucketConfig,
61-
final String destinationS3PrefixConfig) {
77+
final String destinationS3PrefixConfig,
78+
final boolean includeSourcePrefixPath) {
6279
checkState();
6380
URI sourceURI = source.metadata().uri();
6481
if (!storage.exists(sourceURI)) {
@@ -67,18 +84,35 @@ private boolean move(final FileObject source,
6784
}
6885
S3BucketKey sourceBucketKey = S3BucketKey.fromURI(sourceURI);
6986

87+
String relativeSourcePrefix = extractPrefix(
88+
sourceBucketKey.key().replaceAll(sourceBucketKey.objectName(), ""));
89+
String newObjectKey = includeSourcePrefixPath ?
90+
relativeSourcePrefix + sourceBucketKey.objectName() : sourceBucketKey.objectName();
91+
7092
var destS3BucketName = Optional
7193
.ofNullable(config.getString(destinationS3BucketConfig))
7294
.orElse(sourceBucketKey.bucketName());
7395

7496
var destBucketKey = new S3BucketKey(
7597
destS3BucketName,
7698
config.getString(destinationS3PrefixConfig),
77-
sourceBucketKey.objectName()
99+
newObjectKey
78100
);
79101
return storage.move(sourceURI, destBucketKey.toURI());
80102
}
81103

104+
private String extractPrefix(final String p) {
105+
String excludeSourcePrefixPath = Optional
106+
.ofNullable(config.getString(EXCLUDE_SOURCE_PREFIX_PATH_CONFIG))
107+
.orElse("");
108+
String prefix = p.replaceAll(excludeSourcePrefixPath, "");
109+
prefix = prefix.replaceAll("^/+", "");
110+
// if there are no subdirectories, return an empty string
111+
if (prefix.length() == 0) {
112+
return "";
113+
}
114+
return prefix.endsWith("/") ? prefix : prefix + "/";
115+
}
82116
/**
83117
* {@inheritDoc}
84118
*/
@@ -110,6 +144,14 @@ public static class Config extends AbstractConfig {
110144
private static final String SUCCESS_AWS_PREFIX_PATH_DOC =
111145
"The prefix to be used for defining the key of an S3 object to move into the destination bucket.";
112146

147+
public static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH =
148+
CONFIG_PREFIX + "success.aws.include.source.prefix.path";
149+
private static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC =
150+
"Indicates whether to include the source prefix path in the destination key.";
151+
public static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH =
152+
CONFIG_PREFIX + "failure.aws.include.source.prefix.path";
153+
private static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC =
154+
"Indicates whether to include the source prefix path in the destination key.";
113155
public static final String FAILURES_AWS_BUCKET_NAME_CONFIG =
114156
CONFIG_PREFIX + "failure.aws.bucket.name";
115157
private static final String FAILURES_AWS_BUCKET_NAME_DOC =
@@ -120,6 +162,11 @@ public static class Config extends AbstractConfig {
120162
private static final String FAILURES_AWS_PREFIX_PATH_DOC =
121163
"The prefix to be used for defining the key of S3 object to move into the destination bucket.";
122164

165+
public static final String EXCLUDE_SOURCE_PREFIX_PATH_CONFIG =
166+
CONFIG_PREFIX + "exclude.source.prefix.path";
167+
private static final String EXCLUDE_SOURCE_PREFIX_PATH_DOC =
168+
"Indicates whether to exclude the source prefix path from the destination key.";
169+
123170
/**
124171
* Creates a new {@link Config} instance.
125172
*/
@@ -152,6 +199,17 @@ static ConfigDef configDef() {
152199
ConfigDef.Width.NONE,
153200
SUCCESS_AWS_PREFIX_PATH_CONFIG
154201
)
202+
.define(
203+
SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH,
204+
ConfigDef.Type.BOOLEAN,
205+
false,
206+
ConfigDef.Importance.LOW,
207+
SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC,
208+
CONFIG_GROUP,
209+
groupCounter++,
210+
ConfigDef.Width.NONE,
211+
SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH
212+
)
155213
.define(
156214
FAILURES_AWS_BUCKET_NAME_CONFIG,
157215
ConfigDef.Type.STRING,
@@ -173,6 +231,28 @@ static ConfigDef configDef() {
173231
groupCounter++,
174232
ConfigDef.Width.NONE,
175233
FAILURES_AWS_PREFIX_PATH_CONFIG
234+
)
235+
.define(
236+
FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH,
237+
ConfigDef.Type.BOOLEAN,
238+
false,
239+
ConfigDef.Importance.LOW,
240+
FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC,
241+
CONFIG_GROUP,
242+
groupCounter++,
243+
ConfigDef.Width.NONE,
244+
FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH
245+
)
246+
.define(
247+
EXCLUDE_SOURCE_PREFIX_PATH_CONFIG,
248+
ConfigDef.Type.STRING,
249+
null,
250+
ConfigDef.Importance.LOW,
251+
EXCLUDE_SOURCE_PREFIX_PATH_DOC,
252+
CONFIG_GROUP,
253+
groupCounter++,
254+
ConfigDef.Width.NONE,
255+
EXCLUDE_SOURCE_PREFIX_PATH_CONFIG
176256
);
177257
}
178258
}

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public class AmazonS3MoveCleanupPolicyTest extends BaseAmazonS3Test {
2222
public static final String S3_TEST_BUCKET = "bucket";
2323
public static final String OBJECT_NAME = "object";
2424
public static final String S3_OBJECT_KEY = "input/" + OBJECT_NAME;
25+
public static final String S3_OBJECT_KEY_WITH_PREFIX = "input/prefix/" + OBJECT_NAME;
26+
public static final String EXCLUDE_SOURCE_PREFIX_PATH = "input";
2527

2628
private AmazonS3Storage storage;
2729

@@ -58,7 +60,36 @@ public void should_move_object_on_success() {
5860
Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/success/" + OBJECT_NAME).toURI()));
5961
}
6062

63+
@Test
64+
public void should_move_object_on_success_with_prefix() {
65+
// GIVEN
66+
client.createBucket(S3_TEST_BUCKET);
67+
client.putObject(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX, "contents");
6168

69+
var cleaner = new AmazonS3MoveCleanupPolicy();
70+
cleaner.setStorage(storage);
71+
cleaner.configure(Map.of(
72+
AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG, "/success/",
73+
AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG, "/failure/",
74+
AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, true,
75+
AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, true,
76+
AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, EXCLUDE_SOURCE_PREFIX_PATH
77+
));
78+
79+
// WHEN
80+
FileObjectMeta objectMetadata = storage.getObjectMetadata(new S3BucketKey(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX));
81+
cleaner.onSuccess(new FileObject(
82+
objectMetadata,
83+
FileObjectOffset.empty(),
84+
FileObjectStatus.COMPLETED
85+
)
86+
);
87+
88+
// THEN
89+
Assert.assertFalse(storage.exists(objectMetadata.uri()));
90+
Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/success/prefix/" + OBJECT_NAME).toURI()));
91+
}
92+
6293
@Test
6394
public void should_move_object_on_failure() {
6495
// GIVEN
@@ -85,4 +116,34 @@ public void should_move_object_on_failure() {
85116
Assert.assertFalse(storage.exists(objectMetadata.uri()));
86117
Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/failure/" + OBJECT_NAME).toURI()));
87118
}
119+
120+
@Test
121+
public void should_move_object_on_failure_with_prefix() {
122+
// GIVEN
123+
client.createBucket(S3_TEST_BUCKET);
124+
client.putObject(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX, "contents");
125+
126+
var cleaner = new AmazonS3MoveCleanupPolicy();
127+
cleaner.setStorage(storage);
128+
cleaner.configure(Map.of(
129+
AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG, "/success/",
130+
AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG, "/failure/",
131+
AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, true,
132+
AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, true,
133+
AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, EXCLUDE_SOURCE_PREFIX_PATH
134+
));
135+
136+
// WHEN
137+
FileObjectMeta objectMetadata = storage.getObjectMetadata(new S3BucketKey(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX));
138+
cleaner.onFailure(new FileObject(
139+
objectMetadata,
140+
FileObjectOffset.empty(),
141+
FileObjectStatus.COMPLETED
142+
)
143+
);
144+
145+
// THEN
146+
Assert.assertFalse(storage.exists(objectMetadata.uri()));
147+
Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/failure/prefix/" + OBJECT_NAME).toURI()));
148+
}
88149
}

0 commit comments

Comments
 (0)