Skip to content

Commit d966026

Browse files
committed
Fix KafkaItemReader ExecutionContext deserialization error when using Jackson2ExecutionContextStringSerializer
Signed-off-by: Hyunwoo Jung <hyunwoojung@kakao.com>
1 parent 30b5120 commit d966026

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemReader.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,8 @@ public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
5656

5757
private static final long DEFAULT_POLL_TIMEOUT = 30L;
5858

59+
private final String topicName;
60+
5961
private final List<TopicPartition> topicPartitions;
6062

6163
private Map<TopicPartition, Long> partitionOffsets;
@@ -110,6 +112,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List<Int
110112
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided");
111113
this.consumerProperties = consumerProperties;
112114
Assert.hasLength(topicName, "Topic name must not be null or empty");
115+
this.topicName = topicName;
113116
Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided");
114117
this.topicPartitions = new ArrayList<>();
115118
for (Integer partition : partitions) {
@@ -174,10 +177,10 @@ public void open(ExecutionContext executionContext) {
174177
}
175178
}
176179
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
177-
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext
178-
.get(TOPIC_PARTITION_OFFSETS);
179-
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
180-
this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
180+
Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
181+
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
182+
this.partitionOffsets.put(new TopicPartition(this.topicName, Integer.parseInt(entry.getKey())),
183+
entry.getValue() == 0 ? 0 : entry.getValue() + 1);
181184
}
182185
}
183186
this.kafkaConsumer.assign(this.topicPartitions);
@@ -203,7 +206,11 @@ public V read() {
203206
@Override
204207
public void update(ExecutionContext executionContext) {
205208
if (this.saveState) {
206-
executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
209+
Map<String, Long> offsets = new HashMap<>();
210+
for (Map.Entry<TopicPartition, Long> entry : this.partitionOffsets.entrySet()) {
211+
offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue());
212+
}
213+
executionContext.put(TOPIC_PARTITION_OFFSETS, offsets);
207214
}
208215
this.kafkaConsumer.commitSync();
209216
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderIntegrationTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -256,8 +256,8 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr
256256
future.get();
257257
}
258258
ExecutionContext executionContext = new ExecutionContext();
259-
Map<TopicPartition, Long> offsets = new HashMap<>();
260-
offsets.put(new TopicPartition("topic3", 0), 1L);
259+
Map<String, Long> offsets = new HashMap<>();
260+
offsets.put("0", 1L);
261261
executionContext.put("topic.partition.offsets", offsets);
262262

263263
// topic3-0: val0, val1, val2, val3, val4
@@ -297,9 +297,9 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int
297297
}
298298

299299
ExecutionContext executionContext = new ExecutionContext();
300-
Map<TopicPartition, Long> offsets = new HashMap<>();
301-
offsets.put(new TopicPartition("topic4", 0), 1L);
302-
offsets.put(new TopicPartition("topic4", 1), 2L);
300+
Map<String, Long> offsets = new HashMap<>();
301+
offsets.put("0", 1L);
302+
offsets.put("1", 2L);
303303
executionContext.put("topic.partition.offsets", offsets);
304304

305305
// topic4-0: val0, val2, val4, val6

0 commit comments

Comments
 (0)