Skip to content

Commit 9085a68

Browse files
Merge pull request #62 from Nasdaq/release-0.8.x
Release -0.8.x
2 parents eaba38f + f87fffa commit 9085a68

File tree

18 files changed

+544
-224
lines changed

18 files changed

+544
-224
lines changed

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ RUN mvn -B \
1010

1111
### Build Images ###
1212
## SDK app ##
13-
FROM strimzi/kafka:0.20.0-kafka-2.6.0 as sdk-app
13+
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 as sdk-app
1414

1515
COPY . /home/kafka
1616

ncds-sdk/pom.xml

Lines changed: 35 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,119 +7,111 @@
77
<parent>
88
<groupId>com.nasdaq.ncds</groupId>
99
<artifactId>ncds</artifactId>
10-
<version>0.7.0</version>
10+
<version>0.8.4</version>
1111
</parent>
1212

1313
<artifactId>ncds-sdk</artifactId>
1414
<packaging>jar</packaging>
1515
<properties>
16-
<kafkaScalaVersion>kafka_2.12</kafkaScalaVersion>
17-
<junit5.version>5.7.2</junit5.version>
1816
<junit5PlatformProvider.version>1.3.2</junit5PlatformProvider.version>
19-
<curatorTestVersion>2.12.0</curatorTestVersion>
20-
<slf4jVersion>1.7.30</slf4jVersion>
2117
<surefire.version>2.22.2</surefire.version>
22-
<strimzi.oauth.version>0.8.1</strimzi.oauth.version>
2318
</properties>
2419

20+
2521
<name>SDK</name>
2622
<description>Provide Development Kit to connect with Kafka</description>
2723

2824
<dependencies>
2925

26+
<dependency>
27+
<groupId>org.apache.avro</groupId>
28+
<artifactId>avro</artifactId>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.apache.kafka</groupId>
32+
<artifactId>kafka-clients</artifactId>
33+
</dependency>
34+
<dependency>
35+
<groupId>com.fasterxml.jackson.core</groupId>
36+
<artifactId>jackson-core</artifactId>
37+
</dependency>
38+
<dependency>
39+
<groupId>com.fasterxml.jackson.core</groupId>
40+
<artifactId>jackson-annotations</artifactId>
41+
</dependency>
42+
43+
<dependency>
44+
<groupId>com.fasterxml.jackson.core</groupId>
45+
<artifactId>jackson-databind</artifactId>
46+
</dependency>
47+
3048
<!-- https://mvnrepository.com/artifact/org.json/json -->
3149
<dependency>
3250
<groupId>org.json</groupId>
3351
<artifactId>json</artifactId>
34-
<version>20190722</version>
3552
</dependency>
3653

54+
<dependency>
55+
<groupId>org.slf4j</groupId>
56+
<artifactId>slf4j-simple</artifactId>
57+
<scope>test</scope>
58+
</dependency>
3759
<!-- Kafka -auth -->
3860
<dependency>
3961
<groupId>io.strimzi</groupId>
4062
<artifactId>kafka-oauth-common</artifactId>
41-
<version>${strimzi.oauth.version}</version>
4263
</dependency>
4364

4465
<!-- Testing -->
4566

46-
<!-- Kafka -->
47-
<dependency>
48-
<groupId>org.apache.kafka</groupId>
49-
<artifactId>${kafkaScalaVersion}</artifactId>
50-
<version>${kafka.version}</version>
51-
<exclusions>
52-
<!-- Don't bring in kafka's logging framework -->
53-
<exclusion>
54-
<groupId>org.slf4j</groupId>
55-
<artifactId>slf4j-log4j12</artifactId>
56-
</exclusion>
57-
<exclusion>
58-
<groupId>javax.mail</groupId>
59-
<artifactId>mail</artifactId>
60-
</exclusion>
61-
</exclusions>
62-
<scope>test</scope>
63-
</dependency>
67+
<!-- Kafka -->
68+
<dependency>
69+
<groupId>org.apache.kafka</groupId>
70+
<artifactId>kafka_2.12</artifactId>
71+
</dependency>
6472

6573
<dependency>
6674
<groupId>com.salesforce.kafka.test</groupId>
6775
<artifactId>kafka-junit-core</artifactId>
68-
<version>3.2.3</version>
69-
<scope>test</scope>
7076
</dependency>
7177

7278
<!-- JUnit5 tests -->
7379
<dependency>
7480
<groupId>org.junit.jupiter</groupId>
7581
<artifactId>junit-jupiter-api</artifactId>
76-
<version>${junit5.version}</version>
77-
<scope>test</scope>
7882
</dependency>
7983

8084
<dependency>
8185
<groupId>org.junit.jupiter</groupId>
8286
<artifactId>junit-jupiter-params</artifactId>
83-
<version>${junit5.version}</version>
84-
<scope>test</scope>
8587
</dependency>
8688

8789
<!-- Mockito for mocks in tests -->
8890
<dependency>
8991
<groupId>org.mockito</groupId>
9092
<artifactId>mockito-core</artifactId>
91-
<version>2.28.2</version>
92-
<scope>test</scope>
9393
</dependency>
9494

9595
<dependency>
9696
<groupId>org.apache.curator</groupId>
9797
<artifactId>curator-test</artifactId>
98-
<version>${curatorTestVersion}</version>
99-
<scope>test</scope>
10098
</dependency>
10199

102100
<!-- Logging in tests -->
103101
<dependency>
104102
<groupId>org.slf4j</groupId>
105-
<artifactId>slf4j-simple</artifactId>
106-
<version>${slf4jVersion}</version>
107-
<scope>test</scope>
103+
<artifactId>slf4j-api</artifactId>
108104
</dependency>
109105

110106
<!-- Testing support class -->
111107
<dependency>
112108
<groupId>com.github.stephenc.high-scale-lib</groupId>
113109
<artifactId>high-scale-lib</artifactId>
114-
<version>1.1.4</version>
115-
<scope>test</scope>
116110
</dependency>
117111

118112
<dependency>
119113
<groupId>org.apache.commons</groupId>
120114
<artifactId>commons-lang3</artifactId>
121-
<version>3.3.2</version>
122-
<scope>test</scope>
123115
</dependency>
124116

125117
</dependencies>

ncds-sdk/src/main/java/com/nasdaq/ncdsclient/NCDSClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class NCDSClient {
3030
/**
3131
*
3232
* @param securityCfg - Authentication Security Properties passed from the Client
33+
* @param kafkaCfg
3334
* @throws Exception - Java Exception
3435
*/
3536
public NCDSClient(Properties securityCfg,Properties kafkaCfg) throws Exception {

ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,41 +32,37 @@ public class NasdaqKafkaAvroConsumer {
3232
private KafkaConsumer kafkaConsumer;
3333
private String clientID;
3434

35-
private Properties securityProps;
36-
private Properties kafkaProps;
35+
private Properties properties = new Properties();
3736
private ReadSchemaTopic readSchemaTopic = new ReadSchemaTopic();
3837

3938
public NasdaqKafkaAvroConsumer(Properties securityCfg,Properties kafkaCfg ) throws Exception {
4039
try {
41-
if (kafkaCfg == null)
40+
if (securityCfg == null) {
41+
properties.setProperty(AuthenticationConfigLoader.OAUTH_CLIENT_ID, "unit-test"); // Just for the unit tests.
42+
}
43+
else {
44+
properties.putAll(securityCfg);
45+
}
46+
if (kafkaCfg == null) {
4247
if (IsItJunit.isJUnitTest()) {
4348
Properties junitKafkaCfg = KafkaConfigLoader.loadConfig();
44-
kafkaProps = junitKafkaCfg;
49+
properties.putAll(junitKafkaCfg);
4550
}
4651
else {
4752
throw new Exception("Kafka Configuration not Defined ");
4853
}
49-
50-
else {
51-
kafkaProps = kafkaCfg;
52-
KafkaConfigLoader.validateAndAddSpecificProperties(kafkaProps);
53-
}
54-
55-
if (securityCfg == null) {
56-
securityProps = new Properties();
57-
securityProps.setProperty(AuthenticationConfigLoader.OAUTH_CLIENT_ID, "unit-test"); // Just for the unit tests.
5854
}
5955
else {
60-
securityProps = securityCfg;
61-
56+
properties.putAll(kafkaCfg);
57+
KafkaConfigLoader.validateAndAddSpecificProperties(properties);
6258
}
6359
}
6460
catch (Exception e) {
6561
throw (e);
6662
}
67-
readSchemaTopic.setSecurityProps(securityProps);
68-
readSchemaTopic.setKafkaProps(kafkaProps);
69-
this.clientID = getClientID(securityProps);
63+
readSchemaTopic.setSecurityProps(properties);
64+
readSchemaTopic.setKafkaProps(properties);
65+
this.clientID = getClientID(properties);
7066

7167
}
7268

@@ -86,7 +82,7 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception {
8682
kafkaConsumer = getConsumer(kafkaSchema, streamName);
8783
TopicPartition topicPartition = new TopicPartition(streamName + ".stream",0);
8884
kafkaConsumer.assign(Collections.singletonList(topicPartition));
89-
if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
85+
if(properties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
9086
return seekToMidNight(topicPartition);
9187
}
9288
}
@@ -144,21 +140,20 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
144140

145141
public KafkaAvroConsumer getConsumer(Schema avroSchema, String streamName) throws Exception {
146142
try {
147-
if(!IsItJunit.isJUnitTest()) {
148-
ConfigProperties.resolveAndExportToSystemProperties(securityProps);
149-
}
143+
// if(!IsItJunit.isJUnitTest()) {
144+
// ConfigProperties.resolveAndExportToSystemProperties(securityProps);
145+
// }
150146
//Properties kafkaProps = KafkaConfigLoader.loadConfig();
151147

152-
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
153-
kafkaProps.put("value.deserializer", AvroDeserializer.class.getName());
154-
if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
155-
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
148+
properties.put("key.deserializer", StringDeserializer.class.getName());
149+
properties.put("value.deserializer", AvroDeserializer.class.getName());
150+
if(!properties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
151+
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
156152
}
157-
if(!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
158-
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + streamName + "_" + getDate());
153+
if(!properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
154+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID);// + "_" + streamName + "_" + getDate());
159155
}
160-
ConfigProperties.resolve(kafkaProps);
161-
return new KafkaAvroConsumer(kafkaProps, avroSchema);
156+
return new KafkaAvroConsumer(properties, avroSchema);
162157
}
163158
catch (Exception e) {
164159
throw e;
@@ -211,7 +206,7 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
211206
kafkaConsumer = getConsumer(newsSchema, topic);
212207
TopicPartition topicPartition = new TopicPartition(topic + ".stream",0);
213208
kafkaConsumer.assign(Collections.singletonList(topicPartition));
214-
if(kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
209+
if(properties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).equals(OffsetResetStrategy.EARLIEST.toString().toLowerCase())) {
215210
return seekToMidNight(topicPartition);
216211
}
217212
return kafkaConsumer;

ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/ReadSchemaTopic.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public ReadSchemaTopic(){
3131
}
3232

3333
public Schema readSchema(String topic) throws Exception {
34-
KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps));
34+
KafkaConsumer schemaConsumer= getConsumer(getClientID(securityProps));
3535
Duration sec = Duration.ofSeconds(10);
3636
Schema messageSchema = null;
3737
ConsumerRecord<String,GenericRecord> lastRecord=null;
@@ -88,7 +88,7 @@ public Set<String> getTopics() throws Exception{
8888

8989
Set<String> topics = new HashSet<String>();
9090

91-
KafkaConsumer schemaConsumer= getConsumer("Control-"+getClientID(securityProps));
91+
KafkaConsumer schemaConsumer= getConsumer(getClientID(securityProps));
9292
Duration sec = Duration.ofSeconds(10);
9393
while (true) {
9494
ConsumerRecords<String, GenericRecord> schemaRecords = schemaConsumer.poll(sec);
@@ -188,4 +188,4 @@ private long getTodayMidNightTimeStamp(){
188188
return timestampFromMidnight;
189189
}
190190

191-
}
191+
}

0 commit comments

Comments
 (0)