Skip to content

Commit 326c9db

Browse files
more fixes
1 parent 607f64b commit 326c9db

File tree

9 files changed

+437
-388
lines changed

9 files changed

+437
-388
lines changed

kafka-streams/analytics-spring-cloud-streams-kafka-consumer/src/test/java/com/example/analytics/AdvancedStreamsOperationsTest.java

Lines changed: 67 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import org.junit.jupiter.api.BeforeEach;
2626
import org.junit.jupiter.api.Disabled;
2727
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.TestInstance;
2829

30+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2931
class AdvancedStreamsOperationsTest {
3032

3133
private TopologyTestDriver testDriver;
@@ -38,53 +40,61 @@ class AdvancedStreamsOperationsTest {
3840

3941
@BeforeEach
4042
void setUp() {
41-
props = new Properties();
42-
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-operations-test");
43-
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
44-
props.put(
45-
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
46-
props.put(
47-
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
48-
Serdes.String().getClass().getName());
49-
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); // Lower for tests
50-
props.put(
51-
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
52-
LogAndContinueExceptionHandler.class.getName());
53-
54-
// Build the topology for testing
55-
StreamsBuilder builder = new StreamsBuilder();
56-
57-
// Use the common JsonSerdeUtils
58-
Serde<PageViewEvent> pageViewSerde =
59-
JsonSerdeUtils.jsonSerde(PageViewEvent.class, objectMapper);
60-
61-
// Create a KStream from the input topic
62-
KStream<String, PageViewEvent> pageViewStream =
63-
builder.stream("page-views", Consumed.with(Serdes.String(), pageViewSerde));
43+
try {
44+
props = new Properties();
45+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "advanced-operations-test");
46+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
47+
props.put(
48+
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
49+
Serdes.String().getClass().getName());
50+
props.put(
51+
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
52+
JsonSerdeUtils.getJsonClass().getName());
53+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); // Lower for tests
54+
props.put(
55+
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
56+
LogAndContinueExceptionHandler.class.getName());
57+
58+
// Build the topology for testing
59+
StreamsBuilder builder = new StreamsBuilder();
60+
61+
// Use the common JsonSerdeUtils
62+
Serde<PageViewEvent> pageViewSerde =
63+
JsonSerdeUtils.jsonSerde(PageViewEvent.class, objectMapper);
64+
65+
// Create a KStream from the input topic
66+
KStream<String, PageViewEvent> pageViewStream =
67+
builder.stream("page-views", Consumed.with(Serdes.String(), pageViewSerde));
68+
69+
// Perform grouping and aggregation (summing durations)
70+
pageViewStream
71+
.selectKey((key, value) -> value.getUserId())
72+
.groupByKey()
73+
.aggregate(
74+
() -> 0L, // Initial value
75+
(key, value, aggregate) -> aggregate + value.getDuration(),
76+
Materialized.with(Serdes.String(), Serdes.Long()))
77+
.toStream()
78+
.to("user-total-duration", Produced.with(Serdes.String(), Serdes.Long()));
79+
80+
// Create test driver and topics
81+
testDriver = new TopologyTestDriver(builder.build(), props);
82+
83+
inputTopic =
84+
testDriver.createInputTopic(
85+
"page-views", Serdes.String().serializer(), pageViewSerde.serializer());
6486

65-
// Perform grouping and aggregation (summing durations)
66-
pageViewStream
67-
.selectKey((key, value) -> value.getUserId())
68-
.groupByKey()
69-
.aggregate(
70-
() -> 0L, // Initial value
71-
(key, value, aggregate) -> aggregate + value.getDuration(),
72-
Materialized.with(Serdes.String(), Serdes.Long()))
73-
.toStream()
74-
.to("user-total-duration", Produced.with(Serdes.String(), Serdes.Long()));
75-
76-
// Create test driver and topics
77-
testDriver = new TopologyTestDriver(builder.build(), props);
78-
79-
inputTopic =
80-
testDriver.createInputTopic(
81-
"page-views", Serdes.String().serializer(), pageViewSerde.serializer());
82-
83-
outputTopic =
84-
testDriver.createOutputTopic(
85-
"user-total-duration",
86-
Serdes.String().deserializer(),
87-
Serdes.Long().deserializer());
87+
outputTopic =
88+
testDriver.createOutputTopic(
89+
"user-total-duration",
90+
Serdes.String().deserializer(),
91+
Serdes.Long().deserializer());
92+
} catch (Exception e) {
93+
if (testDriver != null) {
94+
testDriver.close();
95+
}
96+
throw e;
97+
}
8898
}
8999

90100
@AfterEach
@@ -144,8 +154,10 @@ void testBranchingOperations() {
144154
branches[2].to("long-duration", Produced.with(Serdes.String(), pageViewSerde));
145155

146156
// Create a new test driver with this topology
147-
TopologyTestDriver branchTestDriver = new TopologyTestDriver(builder.build(), props);
157+
TopologyTestDriver branchTestDriver = null;
148158
try {
159+
branchTestDriver = new TopologyTestDriver(builder.build(), props);
160+
149161
// Create test topics
150162
TestInputTopic<String, PageViewEvent> branchInputTopic =
151163
branchTestDriver.createInputTopic(
@@ -191,8 +203,15 @@ void testBranchingOperations() {
191203

192204
assertThat(longEvents).hasSize(1);
193205
assertThat(longEvents.getFirst().getDuration()).isEqualTo(75);
206+
} catch (Exception e) {
207+
if (branchTestDriver != null) {
208+
branchTestDriver.close();
209+
}
210+
throw e;
194211
} finally {
195-
branchTestDriver.close();
212+
if (branchTestDriver != null) {
213+
branchTestDriver.close();
214+
}
196215
}
197216
}
198217
}

kafka-streams/analytics-spring-cloud-streams-kafka-consumer/src/test/java/com/example/analytics/ProcessorApiTest.java

Lines changed: 63 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.junit.jupiter.api.AfterEach;
2727
import org.junit.jupiter.api.BeforeEach;
2828
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.TestInstance;
2930

31+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
3032
class ProcessorApiTest {
3133

3234
private TopologyTestDriver testDriver;
@@ -37,53 +39,67 @@ class ProcessorApiTest {
3739

3840
@BeforeEach
3941
void setUp() {
40-
// Configure Kafka Streams for testing
41-
Properties props = new Properties();
42-
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-api-test");
43-
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
44-
45-
// Create topology with processor API
46-
Topology topology = new Topology();
47-
48-
// Add source node
49-
topology.addSource(
50-
"PageViewSource",
51-
Serdes.String().deserializer(),
52-
JsonSerdeUtils.jsonSerde(PageViewEvent.class).deserializer(),
53-
"page-views");
54-
55-
// Add processor node
56-
topology.addProcessor("PageViewProcessor", () -> new PageViewProcessor(), "PageViewSource");
57-
58-
// Add state store
59-
topology.addStateStore(
60-
Stores.keyValueStoreBuilder(
61-
Stores.persistentKeyValueStore(STATE_STORE_NAME),
62-
Serdes.String(),
63-
Serdes.Long()),
64-
"PageViewProcessor");
65-
66-
// Add sink node
67-
topology.addSink(
68-
"PageCountSink",
69-
"page-counts",
70-
new StringSerializer(),
71-
new LongSerializer(),
72-
"PageViewProcessor");
73-
74-
// Create the test driver
75-
testDriver = new TopologyTestDriver(topology, props);
76-
77-
// Create test topics
78-
inputTopic =
79-
testDriver.createInputTopic(
80-
"page-views",
81-
new StringSerializer(),
82-
JsonSerdeUtils.jsonSerde(PageViewEvent.class).serializer());
83-
84-
outputTopic =
85-
testDriver.createOutputTopic(
86-
"page-counts", new StringDeserializer(), new LongDeserializer());
42+
try {
43+
// Configure Kafka Streams for testing
44+
Properties props = new Properties();
45+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-api-test");
46+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
47+
props.put(
48+
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
49+
Serdes.String().getClass().getName());
50+
props.put(
51+
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
52+
JsonSerdeUtils.getJsonClass().getName());
53+
54+
// Create topology with processor API
55+
Topology topology = new Topology();
56+
57+
// Add source node
58+
topology.addSource(
59+
"PageViewSource",
60+
Serdes.String().deserializer(),
61+
JsonSerdeUtils.jsonSerde(PageViewEvent.class).deserializer(),
62+
"page-views");
63+
64+
// Add processor node
65+
topology.addProcessor(
66+
"PageViewProcessor", () -> new PageViewProcessor(), "PageViewSource");
67+
68+
// Add state store
69+
topology.addStateStore(
70+
Stores.keyValueStoreBuilder(
71+
Stores.persistentKeyValueStore(STATE_STORE_NAME),
72+
Serdes.String(),
73+
Serdes.Long()),
74+
"PageViewProcessor");
75+
76+
// Add sink node
77+
topology.addSink(
78+
"PageCountSink",
79+
"page-counts",
80+
new StringSerializer(),
81+
new LongSerializer(),
82+
"PageViewProcessor");
83+
84+
// Create the test driver
85+
testDriver = new TopologyTestDriver(topology, props);
86+
87+
// Create test topics
88+
inputTopic =
89+
testDriver.createInputTopic(
90+
"page-views",
91+
new StringSerializer(),
92+
JsonSerdeUtils.jsonSerde(PageViewEvent.class).serializer());
93+
94+
outputTopic =
95+
testDriver.createOutputTopic(
96+
"page-counts", new StringDeserializer(), new LongDeserializer());
97+
} catch (Exception e) {
98+
if (testDriver != null) {
99+
testDriver.close();
100+
}
101+
throw e;
102+
}
87103
}
88104

89105
@AfterEach

kafka-streams/analytics-spring-cloud-streams-kafka-consumer/src/test/java/com/example/analytics/StateStoreAndInteractiveQueriesTest.java

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.junit.jupiter.api.AfterEach;
2424
import org.junit.jupiter.api.BeforeEach;
2525
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.TestInstance;
2627

28+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2729
class StateStoreAndInteractiveQueriesTest {
2830

2931
private TopologyTestDriver testDriver;
@@ -33,45 +35,54 @@ class StateStoreAndInteractiveQueriesTest {
3335

3436
@BeforeEach
3537
void setUp() {
36-
// Configure Kafka Streams for testing
37-
Properties props = new Properties();
38-
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "state-store-test");
39-
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
40-
props.put(
41-
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
42-
props.put(
43-
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
44-
Serdes.String().getClass().getName());
45-
46-
// Create a topology that processes page views and maintains average duration per page
47-
StreamsBuilder builder = new StreamsBuilder();
48-
49-
// Create state store for average duration
50-
StoreBuilder<KeyValueStore<String, AvgDuration>> storeBuilder =
51-
Stores.keyValueStoreBuilder(
52-
Stores.persistentKeyValueStore(AVG_DURATION_STORE),
53-
Serdes.String(),
54-
new AvgDurationSerde());
55-
56-
builder.addStateStore(storeBuilder);
57-
58-
// Process the input stream with a custom processor
59-
builder.stream(
60-
"page-views",
61-
Consumed.with(
62-
Serdes.String(),
63-
JsonSerdeUtils.jsonSerde(PageViewEvent.class, objectMapper)))
64-
.process(PageViewAverageProcessor::new, AVG_DURATION_STORE);
65-
66-
// Build topology
67-
testDriver = new TopologyTestDriver(builder.build(), props);
68-
69-
// Create input topic
70-
inputTopic =
71-
testDriver.createInputTopic(
72-
"page-views",
73-
Serdes.String().serializer(),
74-
JsonSerdeUtils.jsonSerde(PageViewEvent.class, objectMapper).serializer());
38+
try {
39+
// Configure Kafka Streams for testing
40+
Properties props = new Properties();
41+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "state-store-test");
42+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
43+
props.put(
44+
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
45+
Serdes.String().getClass().getName());
46+
props.put(
47+
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
48+
JsonSerdeUtils.getJsonClass().getName());
49+
50+
// Create a topology that processes page views and maintains average duration per page
51+
StreamsBuilder builder = new StreamsBuilder();
52+
53+
// Create state store for average duration
54+
StoreBuilder<KeyValueStore<String, AvgDuration>> storeBuilder =
55+
Stores.keyValueStoreBuilder(
56+
Stores.persistentKeyValueStore(AVG_DURATION_STORE),
57+
Serdes.String(),
58+
new AvgDurationSerde());
59+
60+
builder.addStateStore(storeBuilder);
61+
62+
// Process the input stream with a custom processor
63+
builder.stream(
64+
"page-views",
65+
Consumed.with(
66+
Serdes.String(),
67+
JsonSerdeUtils.jsonSerde(PageViewEvent.class, objectMapper)))
68+
.process(PageViewAverageProcessor::new, AVG_DURATION_STORE);
69+
70+
// Build topology
71+
testDriver = new TopologyTestDriver(builder.build(), props);
72+
73+
// Create input topic
74+
inputTopic =
75+
testDriver.createInputTopic(
76+
"page-views",
77+
Serdes.String().serializer(),
78+
JsonSerdeUtils.jsonSerde(PageViewEvent.class, objectMapper)
79+
.serializer());
80+
} catch (Exception e) {
81+
if (testDriver != null) {
82+
testDriver.close();
83+
}
84+
throw e;
85+
}
7586
}
7687

7788
@AfterEach

0 commit comments

Comments
 (0)