Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.gotocompany.firehose.config;

import com.gotocompany.firehose.config.converter.HttpSinkSerializerJsonTypecastConfigConverter;
import com.gotocompany.firehose.config.converter.*;
import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType;
import com.gotocompany.firehose.config.enums.HttpSinkParameterPlacementType;
import com.gotocompany.firehose.config.enums.HttpSinkParameterSourceType;
import com.gotocompany.firehose.config.enums.HttpSinkRequestMethodType;
import com.gotocompany.firehose.config.converter.HttpSinkRequestMethodConverter;
import com.gotocompany.firehose.config.converter.HttpSinkParameterDataFormatConverter;
import com.gotocompany.firehose.config.converter.HttpSinkParameterPlacementTypeConverter;
import com.gotocompany.firehose.config.converter.HttpSinkParameterSourceTypeConverter;
import com.gotocompany.firehose.config.converter.RangeToHashMapConverter;
import com.jayway.jsonpath.Option;

import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -80,6 +76,11 @@ public interface HttpSinkConfig extends AppConfig {
@DefaultValue("")
String getSinkHttpJsonBodyTemplate();

@Key("SINK_HTTP_JSON_BODY_TEMPLATE_PARSE_OPTION")
@DefaultValue("")
@ConverterClass(HttpJsonBodyTemplateParseOptionConverter.class)
Option getSinkHttpJsonBodyTemplateParseOption();

@Key("SINK_HTTP_PARAMETER_PLACEMENT")
@DefaultValue("header")
@ConverterClass(HttpSinkParameterPlacementTypeConverter.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gotocompany.firehose.config.converter;

import com.jayway.jsonpath.Option;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;

public class HttpJsonBodyTemplateParseOptionConverter implements Converter<Option> {
@Override
public Option convert(Method method, String input) {
return Option.valueOf(input);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.PathNotFoundException;
import com.gotocompany.stencil.Parser;
import org.json.simple.parser.JSONParser;
Expand All @@ -32,10 +34,11 @@ public class MessageToTemplatizedJson implements MessageSerializer {
private Parser protoParser;
private HashSet<String> pathsToReplace;
private JSONParser jsonParser;
private final Configuration jsonPathConfig;
private FirehoseInstrumentation firehoseInstrumentation;

public static MessageToTemplatizedJson create(FirehoseInstrumentation firehoseInstrumentation, String httpSinkJsonBodyTemplate, Parser protoParser) {
MessageToTemplatizedJson messageToTemplatizedJson = new MessageToTemplatizedJson(firehoseInstrumentation, httpSinkJsonBodyTemplate, protoParser);
public static MessageToTemplatizedJson create(FirehoseInstrumentation firehoseInstrumentation, String httpSinkJsonBodyTemplate, Parser protoParser, Option option) {
MessageToTemplatizedJson messageToTemplatizedJson = new MessageToTemplatizedJson(firehoseInstrumentation, httpSinkJsonBodyTemplate, protoParser, option);
if (messageToTemplatizedJson.isInvalidJson()) {
throw new ConfigurationException("Given HTTPSink JSON body template :"
+ httpSinkJsonBodyTemplate
Expand All @@ -45,11 +48,12 @@ public static MessageToTemplatizedJson create(FirehoseInstrumentation firehoseIn
return messageToTemplatizedJson;
}

public MessageToTemplatizedJson(FirehoseInstrumentation firehoseInstrumentation, String httpSinkJsonBodyTemplate, Parser protoParser) {
public MessageToTemplatizedJson(FirehoseInstrumentation firehoseInstrumentation, String httpSinkJsonBodyTemplate, Parser protoParser, Option option) {
this.httpSinkJsonBodyTemplate = httpSinkJsonBodyTemplate;
this.protoParser = protoParser;
this.jsonParser = new JSONParser();
this.gson = new Gson();
this.jsonPathConfig = Configuration.defaultConfiguration().addOptions(option);
this.firehoseInstrumentation = firehoseInstrumentation;
}

Expand Down Expand Up @@ -90,6 +94,22 @@ public String serialize(Message message) throws DeserializerException {
}
finalMessage = finalMessage.replace(path, jsonString);
}

for (String path : pathsToReplace) {
if (path.equals(ALL_FIELDS_FROM_TEMPLATE)) {
jsonString = jsonMessage;
} else {
Object element = JsonPath.using(jsonPathConfig).parse(jsonMessage).read(path.replaceAll("\"", ""));
if (element == null && jsonPathConfig.getOptions().contains(Option.SUPPRESS_EXCEPTIONS)) {
firehoseInstrumentation.logWarn("Missing value for path: {}", path);
jsonString = "";
} else {
jsonString = gson.toJson(element);
}
}
finalMessage = finalMessage.replace(path, jsonString);
}

return finalMessage;
} catch (InvalidProtocolBufferException | PathNotFoundException e) {
throw new DeserializerException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public MessageSerializer build() {
} else {
firehoseInstrumentation.logDebug("Serializer type: EsbMessageToTemplatizedJson");
return getTypecastedJsonSerializer(
MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser));
MessageToTemplatizedJson.create(new FirehoseInstrumentation(statsDReporter, MessageToTemplatizedJson.class), httpSinkConfig.getSinkHttpJsonBodyTemplate(), protoParser, httpSinkConfig.getSinkHttpJsonBodyTemplateParseOption()));
}
}

Expand Down
Loading