Skip to content

Commit 50efce6

Browse files
committed
Fix kafka module according to SK recent changes
Related to: spring-projects/spring-kafka#4087 In particular the `MessageConverter` in SK has a new signature from now on: ``` toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object consumer, Type type); ```
1 parent 1e71d19 commit 50efce6

File tree

2 files changed

+63
-62
lines changed

2 files changed

+63
-62
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.springframework.kafka.event.ConsumerResumedEvent;
4848
import org.springframework.kafka.listener.ContainerProperties;
4949
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
50-
import org.springframework.kafka.support.Acknowledgment;
5150
import org.springframework.kafka.support.KafkaHeaders;
5251
import org.springframework.kafka.support.converter.MessagingMessageConverter;
5352
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -140,9 +139,7 @@ else if (event instanceof ConsumerResumedEvent) {
140139
gateway.setMessageConverter(new MessagingMessageConverter() {
141140

142141
@Override
143-
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
144-
Consumer<?, ?> con, Type type) {
145-
142+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object con, Type type) {
146143
Message<?> message = super.toMessage(record, acknowledgment, con, type);
147144
return MessageBuilder.fromMessage(message)
148145
.setHeader("testHeader", "testValue")
@@ -165,14 +162,15 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
165162
assertThat(received).isNotNull();
166163

167164
MessageHeaders headers = received.getHeaders();
168-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
169-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
170-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
171-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
172-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
173-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
174-
assertThat(headers.get(KafkaHeaders.REPLY_TOPIC)).isEqualTo(topic2);
175-
assertThat(headers.get("testHeader")).isEqualTo("testValue");
165+
assertThat(headers)
166+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
167+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic1)
168+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
169+
.containsEntry(KafkaHeaders.OFFSET, 0L)
170+
.containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L)
171+
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
172+
.containsEntry(KafkaHeaders.REPLY_TOPIC, topic2)
173+
.containsEntry("testHeader", "testValue");
176174

177175
reply.send(MessageBuilder.withPayload("FOO").copyHeaders(headers).build());
178176

@@ -226,8 +224,7 @@ protected boolean doSend(Message<?> message, long timeout) {
226224
gateway.setMessageConverter(new MessagingMessageConverter() {
227225

228226
@Override
229-
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
230-
Consumer<?, ?> con, Type type) {
227+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object con, Type type) {
231228
Message<?> message = super.toMessage(record, acknowledgment, con, type);
232229
return MessageBuilder.fromMessage(message)
233230
.setHeader("testHeader", "testValue")
@@ -255,14 +252,15 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
255252
MessageHeaders headers = failed.getHeaders();
256253
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
257254

258-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
259-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic3);
260-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
261-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
262-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
263-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
264-
assertThat(headers.get(KafkaHeaders.REPLY_TOPIC)).isEqualTo(topic4);
265-
assertThat(headers.get("testHeader")).isEqualTo("testValue");
255+
assertThat(headers)
256+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
257+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic3)
258+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
259+
.containsEntry(KafkaHeaders.OFFSET, 0L)
260+
.containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L)
261+
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
262+
.containsEntry(KafkaHeaders.REPLY_TOPIC, topic4)
263+
.containsEntry("testHeader", "testValue");
266264

267265
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, topic4);
268266
assertThat(record).has(partition(1));
@@ -307,8 +305,7 @@ protected boolean doSend(Message<?> message, long timeout) {
307305
gateway.setMessageConverter(new MessagingMessageConverter() {
308306

309307
@Override
310-
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
311-
Consumer<?, ?> con, Type type) {
308+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object con, Type type) {
312309
Message<?> message = super.toMessage(record, acknowledgment, con, type);
313310
return MessageBuilder.fromMessage(message)
314311
.setHeader("testHeader", "testValue")
@@ -336,14 +333,15 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
336333
MessageHeaders headers = failed.getHeaders();
337334
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
338335

339-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
340-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
341-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
342-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
343-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
344-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
345-
assertThat(headers.get(KafkaHeaders.REPLY_TOPIC)).isEqualTo(topic6);
346-
assertThat(headers.get("testHeader")).isEqualTo("testValue");
336+
assertThat(headers)
337+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
338+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic5)
339+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
340+
.containsEntry(KafkaHeaders.OFFSET, 0L)
341+
.containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L)
342+
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
343+
.containsEntry(KafkaHeaders.REPLY_TOPIC, topic6)
344+
.containsEntry("testHeader", "testValue");
347345

348346
ConsumerRecord<Integer, String> record = KafkaTestUtils.getSingleRecord(consumer, topic6);
349347
assertThat(record).has(partition(1));

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@ void testInboundRecord(EmbeddedKafkaBroker embeddedKafka) {
151151
adapter.setRecordMessageConverter(new MessagingMessageConverter() {
152152

153153
@Override
154-
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
155-
Consumer<?, ?> consumer, Type type) {
154+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object consumer, Type type) {
156155
Message<?> message = super.toMessage(record, acknowledgment, consumer, type);
157156
return MessageBuilder.fromMessage(message).setHeader("testHeader", "testValue").build();
158157
}
@@ -189,27 +188,26 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
189188
assertThat(received.getPayload()).isInstanceOf(KafkaNull.class);
190189

191190
headers = received.getHeaders();
192-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
193-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
194-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
195-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(1L);
196-
assertThat((Long) headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isGreaterThan(0L);
197-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
198-
199-
assertThat(headers.get("testHeader")).isEqualTo("testValue");
191+
assertThat(headers)
192+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
193+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic1)
194+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
195+
.containsEntry(KafkaHeaders.OFFSET, 1L)
196+
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
197+
.containsEntry("testHeader", "testValue");
200198

201199
adapter.setMessageConverter(new RecordMessageConverter() {
202200

203201
@Override
204-
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
205-
Consumer<?, ?> consumer, Type type) {
202+
public Message<?> toMessage(ConsumerRecord<?, ?> record, Object acknowledgment, Object con, Type type) {
206203
throw new RuntimeException("testError");
207204
}
208205

209206
@Override
210207
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
211208
return null;
212209
}
210+
213211
});
214212
PollableChannel errors = new QueueChannel();
215213
adapter.setErrorChannel(errors);
@@ -272,10 +270,12 @@ protected boolean doSend(Message<?> message, long timeout) {
272270
assertThat(originalMessage).isNotNull();
273271
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull();
274272
headers = originalMessage.getHeaders();
275-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
276-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic4);
277-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
278-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
273+
assertThat(headers)
274+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
275+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic4)
276+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
277+
.containsEntry(KafkaHeaders.OFFSET, 0L);
278+
279279
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(3);
280280

281281
assertThat(receivedMessageHistory.get()).isNotNull();
@@ -383,10 +383,11 @@ protected boolean doSend(Message<?> message, long timeout) {
383383
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
384384
.isSameAs(headers.get(KafkaHeaders.RAW_DATA));
385385
headers = originalMessage.getHeaders();
386-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
387-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
388-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
389-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
386+
assertThat(headers)
387+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
388+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic5)
389+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
390+
.containsEntry(KafkaHeaders.OFFSET, 0L);
390391
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(1);
391392

392393
adapter.stop();
@@ -397,7 +398,8 @@ protected boolean doSend(Message<?> message, long timeout) {
397398
void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception {
398399
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", true);
399400
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
400-
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 12);
401+
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 24);
402+
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 2000);
401403

402404
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
403405
ContainerProperties containerProps = new ContainerProperties(topic2);
@@ -513,14 +515,15 @@ void testInboundJson(EmbeddedKafkaBroker embeddedKafka) {
513515
assertThat(received).isNotNull();
514516

515517
MessageHeaders headers = received.getHeaders();
516-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
517-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic3);
518-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
519-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
518+
assertThat(headers)
519+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
520+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic3)
521+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
522+
.containsEntry(KafkaHeaders.OFFSET, 0L)
523+
.containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L)
524+
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
525+
.containsEntry("foo", "bar");
520526

521-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
522-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
523-
assertThat(headers.get("foo")).isEqualTo("bar");
524527
assertThat(received.getPayload()).isInstanceOf(Map.class);
525528

526529
adapter.stop();
@@ -579,8 +582,8 @@ void testInboundJsonWithPayload(EmbeddedKafkaBroker embeddedKafka) {
579582
@SuppressWarnings({"unchecked", "rawtypes"})
580583
@Test
581584
void testPauseResume() throws Exception {
582-
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
583-
Consumer<Integer, String> consumer = mock(Consumer.class);
585+
ConsumerFactory<Integer, String> cf = mock();
586+
Consumer<Integer, String> consumer = mock();
584587
given(cf.createConsumer(eq("testPauseResumeGroup"), eq("clientId"), isNull(), any())).willReturn(consumer);
585588
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
586589
records.put(new TopicPartition("foo", 0), Arrays.asList(

0 commit comments

Comments
 (0)