Skip to content

Commit 6a1e57c

Browse files
feat: configurable timezone for dlq blob file partitioning (#90)
* feat: configurable timezone for dlq blob file partitioning * chore: update firehose version to 0.12.18 * fix: fix checkstyle errors * docs: add documentation for dlq file partitioning timezone config * feat: handle npe gracefully and cache timezone * refactor: remove fallback to utc and use converter design pattern * test: add unit tests for dlq file partitioning timezone * test: add unit tests for dlq file partitioning timezone * test: add unit tests for dlq file partitioning timezone * test: add unit tests for dlq file partitioning timezone * test: add unit tests for dlq file partitioning timezone * test: add unit tests for dlq file partitioning timezone
1 parent a1ca71f commit 6a1e57c

File tree

9 files changed

+466
-27
lines changed

9 files changed

+466
-27
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ lombok {
3333
}
3434

3535
group 'com.gotocompany'
36-
version '0.12.17'
36+
version '0.12.18'
3737

3838
def projName = "firehose"
3939

docs/docs/advance/dlq.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,35 @@ If the writer type is set to BLOB_STORAGE, we can choose any blob storage. Curre
3838
* Type: `optional`
3939
* Default value: `GCS`
4040

41+
## `DLQ_BLOB_FILE_PARTITION_TIMEZONE`
42+
43+
Timezone to be used for date-based partitioning of DLQ files when using BLOB_STORAGE writer type. DLQ files are organized into directories based on the consume timestamp of the message converted to the specified timezone. The configuration accepts standard timezone identifiers and will fail application startup if an invalid timezone is provided.
44+
45+
* Example value: `Asia/Tokyo`
46+
* Type: `optional`
47+
* Default value: `UTC`
48+
49+
### Valid Timezone Formats
50+
51+
#### 1. IANA Timezone Identifiers
52+
Handles daylight saving time transitions automatically.
53+
* Example: `Asia/Jakarta` - Western Indonesia Time (UTC+7)
54+
* Format: `Continent/City` pattern (e.g., `America/New_York`, `Europe/London`, `Australia/Sydney`)
55+
56+
#### 2. UTC Offset Formats
57+
Fixed offset from UTC.
58+
* Example: `+05:30` - 5 hours 30 minutes ahead of UTC
59+
* Format: `±HH:MM` or `UTC±H` (e.g., `-08:00`, `UTC+9`)
60+
61+
#### 3. UTC Variants
62+
* Example: `UTC` - Coordinated Universal Time
63+
* Supported: `UTC`, `GMT`, `Z`
64+
65+
#### 4. Legacy Timezone IDs
66+
* Example: `JST` - Japan Standard Time
67+
* Supported: `EST`, `PST`, `JST`
68+
69+
4170
## `DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID`
4271

4372
* Example value: `my-project-id`

src/main/java/com/gotocompany/firehose/config/DlqConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
import com.gotocompany.firehose.config.converter.BlobStorageTypeConverter;
44
import com.gotocompany.firehose.config.converter.DlqWriterTypeConverter;
5+
import com.gotocompany.firehose.config.converter.TimeZoneConverter;
56
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType;
67
import com.gotocompany.firehose.sink.dlq.DLQWriterType;
78

9+
import java.time.ZoneId;
10+
811
public interface DlqConfig extends AppConfig {
912

1013
@Key("DLQ_WRITER_TYPE")
@@ -29,4 +32,9 @@ public interface DlqConfig extends AppConfig {
2932
@DefaultValue("false")
3033
boolean getDlqSinkEnable();
3134

35+
@Key("DLQ_BLOB_FILE_PARTITION_TIMEZONE")
36+
@DefaultValue("UTC")
37+
@ConverterClass(TimeZoneConverter.class)
38+
ZoneId getDlqBlobFilePartitionTimezone();
39+
3240
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.gotocompany.firehose.config.converter;
2+
3+
import com.gotocompany.firehose.exception.ConfigurationException;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.aeonbits.owner.Converter;
6+
7+
import java.lang.reflect.Method;
8+
import java.time.ZoneId;
9+
import java.time.zone.ZoneRulesException;
10+
11+
@Slf4j
12+
public class TimeZoneConverter implements Converter<ZoneId> {
13+
private static final String DEFAULT_TIMEZONE = "UTC";
14+
private static final ZoneId DEFAULT_ZONE_ID = ZoneId.of(DEFAULT_TIMEZONE);
15+
16+
@Override
17+
public ZoneId convert(Method method, String input) {
18+
try {
19+
if (input == null || input.trim().isEmpty()) {
20+
log.info("DLQ blob file partition timezone configuration is null or empty, using default timezone: {}", DEFAULT_TIMEZONE);
21+
return DEFAULT_ZONE_ID;
22+
}
23+
24+
String trimmedTimezone = input.trim();
25+
ZoneId zoneId = ZoneId.of(trimmedTimezone);
26+
27+
log.info("DLQ blob file partition timezone configuration validated successfully: '{}'", zoneId.getId());
28+
29+
if (!zoneId.getId().equals(trimmedTimezone)) {
30+
log.warn("DLQ blob file partition timezone '{}' was normalized to '{}' during validation",
31+
trimmedTimezone, zoneId.getId());
32+
}
33+
34+
return zoneId;
35+
36+
} catch (ZoneRulesException e) {
37+
String errorMessage = String.format(
38+
"Invalid DLQ blob file partition timezone configuration '%s'. Please provide a valid timezone identifier. Error: %s",
39+
input, e.getMessage());
40+
log.error(errorMessage);
41+
throw new ConfigurationException(errorMessage, e);
42+
43+
} catch (Exception e) {
44+
String errorMessage = String.format(
45+
"Unexpected error during DLQ blob file partition timezone configuration validation. Error: %s",
46+
e.getMessage());
47+
log.error(errorMessage);
48+
throw new ConfigurationException(errorMessage, e);
49+
}
50+
}
51+
}

src/main/java/com/gotocompany/firehose/sink/dlq/DlqWriterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static DlqWriter create(Map<String, String> configuration, StatsDReporter
4949
throw new IllegalArgumentException("DLQ Blob Storage type " + dlqConfig.getBlobStorageType() + "is not supported");
5050
}
5151
BlobStorage blobStorage = BlobStorageFactory.createObjectStorage(dlqConfig.getBlobStorageType(), configuration);
52-
return new BlobStorageDlqWriter(blobStorage);
52+
return new BlobStorageDlqWriter(blobStorage, dlqConfig);
5353
case LOG:
5454
return new LogDlqWriter(new FirehoseInstrumentation(client, LogDlqWriter.class));
5555

src/main/java/com/gotocompany/firehose/sink/dlq/blobstorage/BlobStorageDlqWriter.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.gotocompany.firehose.config.DlqConfig;
56
import com.gotocompany.firehose.message.Message;
67
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
78
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageException;
@@ -27,15 +28,18 @@
2728
public class BlobStorageDlqWriter implements DlqWriter {
2829
private final BlobStorage blobStorage;
2930
private final ObjectMapper objectMapper;
31+
private final DlqConfig dlqConfig;
3032

31-
public BlobStorageDlqWriter(BlobStorage blobStorage) {
33+
public BlobStorageDlqWriter(BlobStorage blobStorage, DlqConfig dlqConfig) {
3234
this.blobStorage = blobStorage;
3335
this.objectMapper = new ObjectMapper();
36+
this.dlqConfig = dlqConfig;
3437
}
3538

3639
@Override
3740
public List<Message> write(List<Message> messages) throws IOException {
38-
Map<Path, List<Message>> messagesByPartition = messages.stream().collect(Collectors.groupingBy(this::createPartition));
41+
Map<Path, List<Message>> messagesByPartition = messages.stream()
42+
.collect(Collectors.groupingBy(this::createPartition));
3943
List<Message> failedMessages = new LinkedList<>();
4044
messagesByPartition.forEach((path, partitionedMessages) -> {
4145
String data = partitionedMessages.stream().map(this::convertToString).collect(Collectors.joining("\n"));
@@ -53,25 +57,36 @@ public List<Message> write(List<Message> messages) throws IOException {
5357

5458
private String convertToString(Message message) {
5559
try {
60+
String errorString = "";
61+
String errorType = "";
62+
if (message.getErrorInfo() != null) {
63+
errorString = message.getErrorInfo().toString();
64+
errorType = message.getErrorInfo().getErrorType().name();
65+
}
66+
5667
return objectMapper.writeValueAsString(new DlqMessage(
57-
Base64.getEncoder().encodeToString(message.getLogKey() == null ? "".getBytes() : message.getLogKey()),
58-
Base64.getEncoder().encodeToString(message.getLogMessage() == null ? "".getBytes() : message.getLogMessage()),
68+
Base64.getEncoder()
69+
.encodeToString(message.getLogKey() == null ? "".getBytes() : message.getLogKey()),
70+
Base64.getEncoder()
71+
.encodeToString(message.getLogMessage() == null ? "".getBytes() : message.getLogMessage()),
5972
message.getTopic(),
6073
message.getPartition(),
6174
message.getOffset(),
6275
message.getTimestamp(),
63-
message.getErrorInfo().toString(),
64-
message.getErrorInfo().getErrorType().name()));
76+
errorString,
77+
errorType));
6578
} catch (JsonProcessingException e) {
6679
log.warn("Not able to convert message into json", e);
6780
return "";
6881
}
6982
}
7083

7184
private Path createPartition(Message message) {
85+
ZoneId zoneId = dlqConfig.getDlqBlobFilePartitionTimezone();
7286
LocalDate consumeLocalDate = LocalDate.from(Instant.ofEpochMilli(message.getConsumeTimestamp())
73-
.atZone(ZoneId.of("UTC")));
87+
.atZone(zoneId));
7488
String consumeDate = DateTimeFormatter.ISO_LOCAL_DATE.format(consumeLocalDate);
7589
return Paths.get(message.getTopic(), consumeDate);
7690
}
91+
7792
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.gotocompany.firehose.config;
2+
3+
import com.gotocompany.firehose.exception.ConfigurationException;
4+
import org.aeonbits.owner.ConfigFactory;
5+
import org.junit.Test;
6+
7+
import java.time.ZoneId;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
11+
import static org.junit.Assert.assertEquals;
12+
13+
public class DlqConfigTest {
14+
15+
@Test
16+
public void shouldLoadValidTimezoneFromConfiguration() {
17+
Map<String, String> configMap = new HashMap<>();
18+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "Asia/Tokyo");
19+
20+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
21+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
22+
23+
assertEquals(ZoneId.of("Asia/Tokyo"), result);
24+
}
25+
26+
@Test
27+
public void shouldUseDefaultTimezoneWhenConfigNotProvided() {
28+
Map<String, String> configMap = new HashMap<>();
29+
30+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
31+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
32+
33+
assertEquals(ZoneId.of("UTC"), result);
34+
}
35+
36+
@Test
37+
public void shouldLoadSystemTimezone() {
38+
Map<String, String> configMap = new HashMap<>();
39+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "America/Chicago");
40+
41+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
42+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
43+
44+
assertEquals(ZoneId.of("America/Chicago"), result);
45+
}
46+
47+
@Test
48+
public void shouldHandleEmptyConfigurationValue() {
49+
Map<String, String> configMap = new HashMap<>();
50+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "");
51+
52+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
53+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
54+
55+
assertEquals(ZoneId.of("UTC"), result);
56+
}
57+
58+
@Test
59+
public void shouldHandleWhitespaceConfigurationValue() {
60+
Map<String, String> configMap = new HashMap<>();
61+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", " ");
62+
63+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
64+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
65+
66+
assertEquals(ZoneId.of("UTC"), result);
67+
}
68+
69+
@Test
70+
public void shouldTrimWhitespaceAroundValidTimezone() {
71+
Map<String, String> configMap = new HashMap<>();
72+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", " Europe/London ");
73+
74+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
75+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
76+
77+
assertEquals(ZoneId.of("Europe/London"), result);
78+
}
79+
80+
@Test(expected = ConfigurationException.class)
81+
public void shouldFailForInvalidTimezoneAtConfigLoad() {
82+
Map<String, String> configMap = new HashMap<>();
83+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "Invalid/Timezone");
84+
85+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
86+
config.getDlqBlobFilePartitionTimezone();
87+
}
88+
89+
@Test(expected = ConfigurationException.class)
90+
public void shouldFailForMalformedTimezoneAtConfigLoad() {
91+
Map<String, String> configMap = new HashMap<>();
92+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "NotAValidTimezone");
93+
94+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
95+
config.getDlqBlobFilePartitionTimezone();
96+
}
97+
98+
@Test
99+
public void shouldLoadOffsetBasedTimezone() {
100+
Map<String, String> configMap = new HashMap<>();
101+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "+05:30");
102+
103+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
104+
ZoneId result = config.getDlqBlobFilePartitionTimezone();
105+
106+
assertEquals(ZoneId.of("+05:30"), result);
107+
}
108+
109+
@Test(expected = ConfigurationException.class)
110+
public void shouldRejectUnsupportedLegacyTimezoneIds() {
111+
Map<String, String> configMap = new HashMap<>();
112+
configMap.put("DLQ_BLOB_FILE_PARTITION_TIMEZONE", "EST");
113+
114+
DlqConfig config = ConfigFactory.create(DlqConfig.class, configMap);
115+
config.getDlqBlobFilePartitionTimezone();
116+
}
117+
}

0 commit comments

Comments
 (0)