Skip to content

Commit 5800bd3

Browse files
feat: drop the deserialization errors in filter engine (#86)
* feat: drop the deserialization errors in filter engine * test: add unit tests for deserialization error handling in filter engine
1 parent a70a2b5 commit 5800bd3

File tree

7 files changed

+222
-10
lines changed

7 files changed

+222
-10
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ lombok {
3333
}
3434

3535
group 'com.gotocompany'
36-
version '0.12.15'
36+
version '0.12.16'
3737

3838
def projName = "firehose"
3939

docs/docs/advance/filters.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ Time window in seconds for future timestamps. Messages with timestamps newer tha
7272

7373
## `FILTER_DROP_DESERIALIZATION_ERROR`
7474

75-
Whether to drop messages with deserialization errors when using timestamp filter.
75+
Whether to drop messages with protobuf deserialization errors instead of failing the entire batch. Applies to all filter types (JSON, JEXL, Timestamp) when processing protobuf messages.
7676

77-
* Example value: `false`
77+
* Example value: `true`
7878
* Type: `optional`
79-
* Default value: `true`
79+
* Default value: `false`
8080

src/main/java/com/gotocompany/firehose/config/FilterConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public interface FilterConfig extends Config {
3737
String getFilterTimestampFieldName();
3838

3939
@Key("FILTER_DROP_DESERIALIZATION_ERROR")
40-
@DefaultValue("true")
40+
@DefaultValue("false")
4141
Boolean getFilterDropDeserializationError();
4242

4343
@Key("FILTER_TIMESTAMP_PAST_WINDOW_SECONDS")

src/main/java/com/gotocompany/firehose/filter/jexl/JexlFilter.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.gotocompany.firehose.filter.jexl;
22

3+
import com.google.protobuf.InvalidProtocolBufferException;
34
import com.gotocompany.firehose.message.Message;
45
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
56
import com.gotocompany.firehose.config.FilterConfig;
@@ -27,9 +28,14 @@
2728
*/
2829
public class JexlFilter implements Filter {
2930

31+
private static final String METRIC_PREFIX = "firehose_jexl_filter_";
32+
private static final String DESERIALIZATION_ERRORS = METRIC_PREFIX + "deserialization_errors_total";
33+
3034
private final Expression expression;
3135
private final FilterDataSourceType filterDataSourceType;
3236
private final String protoSchema;
37+
private final boolean dropDeserializationError;
38+
private final FirehoseInstrumentation firehoseInstrumentation;
3339

3440
/**
3541
* Instantiates a new Message filter.
@@ -43,6 +49,8 @@ public JexlFilter(FilterConfig filterConfig, FirehoseInstrumentation firehoseIns
4349
engine.setStrict(true);
4450
this.filterDataSourceType = filterConfig.getFilterDataSource();
4551
this.protoSchema = filterConfig.getFilterSchemaProtoClass();
52+
this.dropDeserializationError = filterConfig.getFilterDropDeserializationError();
53+
this.firehoseInstrumentation = firehoseInstrumentation;
4654
firehoseInstrumentation.logInfo("\n\tFilter type: {}", this.filterDataSourceType);
4755
this.expression = engine.createExpression(filterConfig.getFilterJexlExpression());
4856
firehoseInstrumentation.logInfo("\n\tFilter schema: {}", this.protoSchema);
@@ -62,13 +70,25 @@ public FilteredMessages filter(List<Message> messages) throws FilterException {
6270
for (Message message : messages) {
6371
try {
6472
Object data = (filterDataSourceType.equals(FilterDataSourceType.KEY)) ? message.getLogKey() : message.getLogMessage();
65-
Object obj = MethodUtils.invokeStaticMethod(Class.forName(protoSchema), "parseFrom", data);
73+
Object obj;
74+
try {
75+
obj = MethodUtils.invokeStaticMethod(Class.forName(protoSchema), "parseFrom", data);
76+
} catch (InvocationTargetException e) {
77+
if (dropDeserializationError && e.getCause() instanceof InvalidProtocolBufferException) {
78+
firehoseInstrumentation.captureCount(DESERIALIZATION_ERRORS, 1L);
79+
firehoseInstrumentation.logWarn("Failed to deserialize protobuf message: {}", e.getCause().getMessage());
80+
filteredMessages.addToInvalidMessages(message);
81+
continue;
82+
} else {
83+
throw new FilterException("Failed while filtering EsbMessages", e);
84+
}
85+
}
6686
if (evaluate(obj)) {
6787
filteredMessages.addToValidMessages(message);
6888
} else {
6989
filteredMessages.addToInvalidMessages(message);
7090
}
71-
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
91+
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException e) {
7292
throw new FilterException("Failed while filtering EsbMessages", e);
7393
}
7494
}

src/main/java/com/gotocompany/firehose/filter/json/JsonFilter.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.fasterxml.jackson.databind.JsonNode;
55
import com.fasterxml.jackson.databind.ObjectMapper;
66
import com.google.protobuf.DynamicMessage;
7+
import com.google.protobuf.InvalidProtocolBufferException;
78
import com.google.protobuf.util.JsonFormat;
89
import com.gotocompany.firehose.message.Message;
910
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
@@ -31,12 +32,16 @@
3132
*/
3233
public class JsonFilter implements Filter {
3334

35+
private static final String METRIC_PREFIX = "firehose_json_filter_";
36+
private static final String DESERIALIZATION_ERRORS = METRIC_PREFIX + "deserialization_errors_total";
37+
3438
private final FilterConfig filterConfig;
3539
private final FirehoseInstrumentation firehoseInstrumentation;
3640
private final JsonSchema schema;
3741
private final ObjectMapper objectMapper = new ObjectMapper();
3842
private JsonFormat.Printer jsonPrinter;
3943
private Parser parser;
44+
private final boolean dropDeserializationError;
4045

4146
/**
4247
* Instantiates a new Json filter.
@@ -47,6 +52,7 @@ public class JsonFilter implements Filter {
4752
public JsonFilter(StencilClient stencilClient, FilterConfig filterConfig, FirehoseInstrumentation firehoseInstrumentation) {
4853
this.firehoseInstrumentation = firehoseInstrumentation;
4954
this.filterConfig = filterConfig;
55+
this.dropDeserializationError = filterConfig.getFilterDropDeserializationError();
5056
JsonSchemaFactory schemaFactory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7);
5157
this.schema = schemaFactory.getSchema(filterConfig.getFilterJsonSchema());
5258
if (filterConfig.getFilterESBMessageFormat() == FilterMessageFormatType.PROTOBUF) {
@@ -68,7 +74,7 @@ public FilteredMessages filter(List<Message> messages) throws FilterException {
6874
for (Message message : messages) {
6975
byte[] data = (filterConfig.getFilterDataSource().equals(KEY)) ? message.getLogKey() : message.getLogMessage();
7076
String jsonMessage = deserialize(data);
71-
if (evaluate(jsonMessage)) {
77+
if (jsonMessage != null && evaluate(jsonMessage)) {
7278
filteredMessages.addToValidMessages(message);
7379
} else {
7480
filteredMessages.addToInvalidMessages(message);
@@ -99,9 +105,14 @@ private String deserialize(byte[] data) throws FilterException {
99105
try {
100106
DynamicMessage message = parser.parse(data);
101107
return jsonPrinter.print(message);
102-
103108
} catch (Exception e) {
104-
throw new FilterException("Failed to parse Protobuf message", e);
109+
if (dropDeserializationError && (e instanceof InvalidProtocolBufferException || e.getCause() instanceof InvalidProtocolBufferException)) {
110+
firehoseInstrumentation.captureCount(DESERIALIZATION_ERRORS, 1L);
111+
firehoseInstrumentation.logWarn("Failed to deserialize protobuf message: {}", e.getMessage());
112+
return null;
113+
} else {
114+
throw new FilterException("Failed to parse Protobuf message", e);
115+
}
105116
}
106117
case JSON:
107118
return new String(data, Charset.defaultCharset());

src/test/java/com/gotocompany/firehose/filter/jexl/JexlFilterTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,96 @@ public void shouldLogFilterTypeIfFilterTypeIsNotNone() {
102102
Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("\n\tFilter schema: {}", TestMessage.class.getName());
103103
Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logInfo("\n\tFilter expression: {}", "testMessage.getOrderNumber() == 123");
104104
}
105+
106+
@Test
107+
public void shouldDropMessageWhenDeserializationFailsAndDropConfigEnabled() throws FilterException {
108+
Message invalidMessage = new Message(new byte[]{1, 2}, new byte[]{1, 2, 3}, "topic1", 0, 100);
109+
Message validMessage = new Message(key.toByteArray(), testMessage.toByteArray(), "topic1", 0, 101);
110+
Map<String, String> filterConfigs = new HashMap<>();
111+
filterConfigs.put("FILTER_DATA_SOURCE", "message");
112+
filterConfigs.put("FILTER_JEXL_EXPRESSION", "testMessage.getOrderNumber() == 123");
113+
filterConfigs.put("FILTER_SCHEMA_PROTO_CLASS", TestMessage.class.getName());
114+
filterConfigs.put("FILTER_DROP_DESERIALIZATION_ERROR", "true");
115+
kafkaConsumerConfig = ConfigFactory.create(FilterConfig.class, filterConfigs);
116+
filter = new JexlFilter(kafkaConsumerConfig, firehoseInstrumentation);
117+
FilteredMessages filteredMessages = filter.filter(Arrays.asList(invalidMessage, validMessage));
118+
assertEquals(1, filteredMessages.sizeOfValidMessages());
119+
assertEquals(1, filteredMessages.sizeOfInvalidMessages());
120+
Mockito.verify(firehoseInstrumentation, Mockito.times(1)).captureCount("firehose_jexl_filter_deserialization_errors_total", 1L);
121+
Mockito.verify(firehoseInstrumentation, Mockito.times(1)).logWarn(Mockito.eq("Failed to deserialize protobuf message: {}"), Mockito.any(String.class));
122+
}
123+
124+
@Test
125+
public void shouldThrowExceptionWhenDeserializationFailsAndDropConfigDisabled() throws FilterException {
126+
Message invalidMessage = new Message(new byte[]{1, 2}, new byte[]{1, 2, 3}, "topic1", 0, 100);
127+
Map<String, String> filterConfigs = new HashMap<>();
128+
filterConfigs.put("FILTER_DATA_SOURCE", "message");
129+
filterConfigs.put("FILTER_JEXL_EXPRESSION", "testMessage.getOrderNumber() == 123");
130+
filterConfigs.put("FILTER_SCHEMA_PROTO_CLASS", TestMessage.class.getName());
131+
filterConfigs.put("FILTER_DROP_DESERIALIZATION_ERROR", "false");
132+
kafkaConsumerConfig = ConfigFactory.create(FilterConfig.class, filterConfigs);
133+
filter = new JexlFilter(kafkaConsumerConfig, firehoseInstrumentation);
134+
try {
135+
filter.filter(Arrays.asList(invalidMessage));
136+
assertEquals("Expected FilterException to be thrown", true, false);
137+
} catch (FilterException e) {
138+
assertEquals("Failed while filtering EsbMessages", e.getMessage());
139+
}
140+
}
141+
142+
@Test
143+
public void shouldNotCaptureMetricsWhenDropConfigDisabled() throws FilterException {
144+
Message invalidMessage = new Message(new byte[]{1, 2}, new byte[]{1, 2, 3}, "topic1", 0, 100);
145+
Map<String, String> filterConfigs = new HashMap<>();
146+
filterConfigs.put("FILTER_DATA_SOURCE", "message");
147+
filterConfigs.put("FILTER_JEXL_EXPRESSION", "testMessage.getOrderNumber() == 123");
148+
filterConfigs.put("FILTER_SCHEMA_PROTO_CLASS", TestMessage.class.getName());
149+
filterConfigs.put("FILTER_DROP_DESERIALIZATION_ERROR", "false");
150+
kafkaConsumerConfig = ConfigFactory.create(FilterConfig.class, filterConfigs);
151+
filter = new JexlFilter(kafkaConsumerConfig, firehoseInstrumentation);
152+
try {
153+
filter.filter(Arrays.asList(invalidMessage));
154+
} catch (FilterException e) {
155+
}
156+
Mockito.verify(firehoseInstrumentation, Mockito.never()).captureCount(Mockito.eq("firehose_jexl_filter_deserialization_errors_total"), Mockito.any(Long.class));
157+
Mockito.verify(firehoseInstrumentation, Mockito.never()).logWarn(Mockito.eq("Failed to deserialize protobuf message: {}"), Mockito.any(String.class));
158+
}
159+
160+
@Test
161+
public void shouldDropMultipleInvalidMessagesAndCaptureCorrectMetrics() throws FilterException {
162+
Message invalidMessage1 = new Message(new byte[]{1, 2}, new byte[]{1, 2, 3}, "topic1", 0, 100);
163+
Message invalidMessage2 = new Message(new byte[]{4, 5}, new byte[]{4, 5, 6}, "topic1", 0, 101);
164+
Message validMessage = new Message(key.toByteArray(), testMessage.toByteArray(), "topic1", 0, 102);
165+
Map<String, String> filterConfigs = new HashMap<>();
166+
filterConfigs.put("FILTER_DATA_SOURCE", "message");
167+
filterConfigs.put("FILTER_JEXL_EXPRESSION", "testMessage.getOrderNumber() == 123");
168+
filterConfigs.put("FILTER_SCHEMA_PROTO_CLASS", TestMessage.class.getName());
169+
filterConfigs.put("FILTER_DROP_DESERIALIZATION_ERROR", "true");
170+
kafkaConsumerConfig = ConfigFactory.create(FilterConfig.class, filterConfigs);
171+
filter = new JexlFilter(kafkaConsumerConfig, firehoseInstrumentation);
172+
FilteredMessages filteredMessages = filter.filter(Arrays.asList(invalidMessage1, invalidMessage2, validMessage));
173+
assertEquals(1, filteredMessages.sizeOfValidMessages());
174+
assertEquals(2, filteredMessages.sizeOfInvalidMessages());
175+
Mockito.verify(firehoseInstrumentation, Mockito.times(2)).captureCount("firehose_jexl_filter_deserialization_errors_total", 1L);
176+
Mockito.verify(firehoseInstrumentation, Mockito.times(2)).logWarn(Mockito.eq("Failed to deserialize protobuf message: {}"), Mockito.any(String.class));
177+
}
178+
179+
@Test
180+
public void shouldStillThrowExceptionForNonDeserializationErrors() throws FilterException {
181+
Message message = new Message(key.toByteArray(), testMessage.toByteArray(), "topic1", 0, 100);
182+
Map<String, String> filterConfigs = new HashMap<>();
183+
filterConfigs.put("FILTER_DATA_SOURCE", "message");
184+
filterConfigs.put("FILTER_JEXL_EXPRESSION", "testMessage.getOrderNumber() == 123");
185+
filterConfigs.put("FILTER_SCHEMA_PROTO_CLASS", "com.invalid.ClassName");
186+
filterConfigs.put("FILTER_DROP_DESERIALIZATION_ERROR", "true");
187+
kafkaConsumerConfig = ConfigFactory.create(FilterConfig.class, filterConfigs);
188+
filter = new JexlFilter(kafkaConsumerConfig, firehoseInstrumentation);
189+
try {
190+
filter.filter(Arrays.asList(message));
191+
assertEquals("Expected FilterException to be thrown", true, false);
192+
} catch (FilterException e) {
193+
assertEquals("Failed while filtering EsbMessages", e.getMessage());
194+
}
195+
Mockito.verify(firehoseInstrumentation, Mockito.never()).captureCount(Mockito.eq("firehose_jexl_filter_deserialization_errors_total"), Mockito.any(Long.class));
196+
}
105197
}

0 commit comments

Comments
 (0)