Skip to content

Commit 75760cf

Browse files
feat : use listener instead of calling receiver everytime
1 parent 333d463 commit 75760cf

File tree

8 files changed

+74
-86
lines changed

8 files changed

+74
-86
lines changed

kafka-reactor/boot-kafka-reactor-consumer/pom.xml

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<description>Demo project for Spring Boot</description>
1616

1717
<properties>
18-
<java.version>17</java.version>
18+
<java.version>21</java.version>
1919
<springdoc-openapi.version>2.6.0</springdoc-openapi.version>
2020

2121
<spotless.version>2.43.0</spotless.version>
@@ -28,11 +28,12 @@
2828
</dependency>
2929
<dependency>
3030
<groupId>org.springframework.boot</groupId>
31-
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
31+
<artifactId>spring-boot-starter-webflux</artifactId>
3232
</dependency>
33+
3334
<dependency>
3435
<groupId>org.springframework.boot</groupId>
35-
<artifactId>spring-boot-starter-webflux</artifactId>
36+
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
3637
</dependency>
3738
<dependency>
3839
<groupId>org.liquibase</groupId>
@@ -42,6 +43,17 @@
4243
<groupId>org.springframework</groupId>
4344
<artifactId>spring-jdbc</artifactId>
4445
</dependency>
46+
<dependency>
47+
<groupId>org.postgresql</groupId>
48+
<artifactId>postgresql</artifactId>
49+
<scope>runtime</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.postgresql</groupId>
53+
<artifactId>r2dbc-postgresql</artifactId>
54+
<scope>runtime</scope>
55+
</dependency>
56+
4557
<dependency>
4658
<groupId>org.springframework.kafka</groupId>
4759
<artifactId>spring-kafka</artifactId>
@@ -51,29 +63,13 @@
5163
<artifactId>reactor-kafka</artifactId>
5264
<version>1.3.22</version>
5365
</dependency>
66+
5467
<dependency>
5568
<groupId>org.springdoc</groupId>
5669
<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
5770
<version>${springdoc-openapi.version}</version>
5871
</dependency>
5972

60-
61-
<dependency>
62-
<groupId>org.postgresql</groupId>
63-
<artifactId>postgresql</artifactId>
64-
<scope>runtime</scope>
65-
</dependency>
66-
<dependency>
67-
<groupId>org.postgresql</groupId>
68-
<artifactId>r2dbc-postgresql</artifactId>
69-
<scope>runtime</scope>
70-
</dependency>
71-
<dependency>
72-
<groupId>org.projectlombok</groupId>
73-
<artifactId>lombok</artifactId>
74-
<optional>true</optional>
75-
</dependency>
76-
7773
<dependency>
7874
<groupId>org.springframework.boot</groupId>
7975
<artifactId>spring-boot-devtools</artifactId>
@@ -127,14 +123,6 @@
127123
<plugin>
128124
<groupId>org.springframework.boot</groupId>
129125
<artifactId>spring-boot-maven-plugin</artifactId>
130-
<configuration>
131-
<excludes>
132-
<exclude>
133-
<groupId>org.projectlombok</groupId>
134-
<artifactId>lombok</artifactId>
135-
</exclude>
136-
</excludes>
137-
</configuration>
138126
</plugin>
139127
<plugin>
140128
<groupId>com.diffplug.spotless</groupId>
@@ -143,7 +131,7 @@
143131
<configuration>
144132
<java>
145133
<palantirJavaFormat>
146-
<version>2.42.0</version>
134+
<version>2.47.0</version>
147135
</palantirJavaFormat>
148136
<importOrder/>
149137
<removeUnusedImports/>
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,22 @@
11
package com.example.boot.kafka.reactor.config;
22

3-
import com.example.boot.kafka.reactor.entity.MessageDTO;
43
import com.example.boot.kafka.reactor.util.AppConstants;
5-
import java.util.Collections;
6-
import lombok.extern.slf4j.Slf4j;
74
import org.apache.kafka.clients.admin.NewTopic;
8-
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
97
import org.springframework.context.annotation.Bean;
108
import org.springframework.context.annotation.Configuration;
119
import org.springframework.kafka.annotation.EnableKafka;
12-
import reactor.kafka.receiver.KafkaReceiver;
13-
import reactor.kafka.receiver.ReceiverOptions;
1410

1511
@EnableKafka
1612
@Configuration(proxyBeanMethods = false)
17-
@Slf4j
18-
public class KafkaConfiguration {
13+
class KafkaConfiguration {
14+
15+
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
1916

2017
@Bean
2118
NewTopic helloTopic() {
2219
log.info("Creating helloTopic");
2320
return new NewTopic(AppConstants.HELLO_TOPIC, 1, (short) 1);
2421
}
25-
26-
@Bean
27-
KafkaReceiver<Integer, MessageDTO> receiver(KafkaProperties properties) {
28-
log.info("Creating receiver");
29-
ReceiverOptions<Integer, MessageDTO> receiverOptions = ReceiverOptions.<Integer, MessageDTO>create(
30-
properties.buildConsumerProperties())
31-
.subscription(Collections.singleton(AppConstants.HELLO_TOPIC))
32-
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
33-
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
34-
35-
return KafkaReceiver.create(receiverOptions);
36-
}
3722
}

kafka-reactor/boot-kafka-reactor-consumer/src/main/java/com/example/boot/kafka/reactor/config/SwaggerConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77

88
@Configuration(proxyBeanMethods = false)
99
@OpenAPIDefinition(info = @Info(title = "boot-kafka-reactor-consumer", version = "v1"), servers = @Server(url = "/"))
10-
public class SwaggerConfig {}
10+
class SwaggerConfig {}

kafka-reactor/boot-kafka-reactor-consumer/src/main/java/com/example/boot/kafka/reactor/controller/MessageController.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.example.boot.kafka.reactor.entity.MessageDTO;
44
import com.example.boot.kafka.reactor.service.MessageService;
5-
import lombok.RequiredArgsConstructor;
65
import org.springframework.http.MediaType;
76
import org.springframework.web.bind.annotation.GetMapping;
87
import org.springframework.web.bind.annotation.RequestMapping;
@@ -11,13 +10,16 @@
1110

1211
@RestController
1312
@RequestMapping("/messages")
14-
@RequiredArgsConstructor
1513
class MessageController {
1614

1715
private final MessageService messageService;
1816

17+
public MessageController(MessageService messageService) {
18+
this.messageService = messageService;
19+
}
20+
1921
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
20-
public Flux<MessageDTO> events() {
22+
Flux<MessageDTO> events() {
2123
return messageService.fetchMessages();
2224
}
2325
}

kafka-reactor/boot-kafka-reactor-consumer/src/main/java/com/example/boot/kafka/reactor/service/MessageService.java

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,42 +2,50 @@
22

33
import com.example.boot.kafka.reactor.entity.MessageDTO;
44
import com.example.boot.kafka.reactor.repository.MessageRepository;
5-
import java.time.LocalDateTime;
6-
import lombok.RequiredArgsConstructor;
7-
import lombok.extern.slf4j.Slf4j;
5+
import com.example.boot.kafka.reactor.util.AppConstants;
6+
import java.time.Instant;
7+
import java.time.ZoneId;
8+
import java.time.ZonedDateTime;
9+
import org.apache.kafka.clients.consumer.ConsumerRecord;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.kafka.annotation.KafkaListener;
13+
import org.springframework.kafka.support.KafkaHeaders;
14+
import org.springframework.messaging.handler.annotation.Header;
815
import org.springframework.stereotype.Service;
916
import reactor.core.publisher.Flux;
1017
import reactor.core.publisher.Mono;
11-
import reactor.kafka.receiver.KafkaReceiver;
12-
import reactor.kafka.receiver.ReceiverOffset;
1318

1419
@Service
15-
@RequiredArgsConstructor
16-
@Slf4j
1720
public class MessageService {
1821

19-
private final KafkaReceiver<Integer, MessageDTO> receiver;
22+
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
23+
2024
private final MessageRepository messageRepository;
2125

22-
private Mono<MessageDTO> saveMessage(MessageDTO messageDTO) {
23-
return messageRepository.save(messageDTO);
26+
public MessageService(MessageRepository messageRepository) {
27+
this.messageRepository = messageRepository;
28+
}
29+
30+
@KafkaListener(topics = AppConstants.HELLO_TOPIC, groupId = "reactivekafka")
31+
Mono<MessageDTO> listen(
32+
@Header(KafkaHeaders.RECEIVED_KEY) Integer key, ConsumerRecord<Integer, MessageDTO> consumerRecord) {
33+
ZonedDateTime zdt =
34+
ZonedDateTime.ofInstant(Instant.ofEpochMilli(consumerRecord.timestamp()), ZoneId.systemDefault());
35+
log.info(
36+
"Received message: topic-partition={} offset={} timestamp={} key={} value={}",
37+
consumerRecord.partition(),
38+
consumerRecord.offset(),
39+
zdt,
40+
key,
41+
consumerRecord.value());
42+
return messageRepository.save(consumerRecord.value());
2443
}
2544

2645
public Flux<MessageDTO> fetchMessages() {
27-
return receiver.receive()
28-
.map(record -> {
29-
ReceiverOffset offset = record.receiverOffset();
30-
var value = record.value();
31-
log.info(
32-
"Received message: topic-partition={} offset={} timestamp={} key={} value={}",
33-
offset.topicPartition(),
34-
offset.offset(),
35-
LocalDateTime.now(),
36-
record.key(),
37-
value);
38-
offset.acknowledge();
39-
return value;
40-
})
41-
.flatMap(this::saveMessage);
46+
return messageRepository
47+
.findAll()
48+
.doOnNext(messageDTO -> log.info("Retrieved Message :{}", messageDTO))
49+
.doOnError(e -> log.error("Reading Error ", e));
4250
}
4351
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package com.example.boot.kafka.reactor.util;
22

3-
import lombok.experimental.UtilityClass;
4-
5-
@UtilityClass
63
public class AppConstants {
74
public static final String HELLO_TOPIC = "hello";
5+
6+
private AppConstants() {
7+
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
8+
}
89
}

kafka-reactor/boot-kafka-reactor-consumer/src/test/java/com/example/boot/kafka/reactor/BootKafkaReactorConsumerApplicationTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
import java.security.SecureRandom;
66
import java.time.LocalDateTime;
77
import java.util.concurrent.TimeUnit;
8-
import lombok.extern.slf4j.Slf4j;
98
import org.apache.kafka.clients.producer.ProducerRecord;
109
import org.apache.kafka.clients.producer.RecordMetadata;
1110
import org.junit.jupiter.api.Test;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1213
import org.springframework.beans.factory.annotation.Autowired;
1314
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
1415
import org.springframework.boot.test.context.SpringBootTest;
@@ -23,9 +24,10 @@
2324
@SpringBootTest(classes = TestBootKafkaReactorConsumerApplication.class)
2425
@ActiveProfiles("test")
2526
@AutoConfigureWebTestClient
26-
@Slf4j
2727
class BootKafkaReactorConsumerApplicationTests {
2828

29+
private static final Logger log = LoggerFactory.getLogger(BootKafkaReactorConsumerApplicationTests.class);
30+
2931
@Autowired
3032
KafkaSender<Integer, MessageDTO> sender;
3133

kafka-reactor/boot-kafka-reactor-consumer/src/test/java/com/example/boot/kafka/reactor/TestBootKafkaReactorConsumerApplication.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
import com.example.boot.kafka.reactor.entity.MessageDTO;
44
import java.util.Map;
5-
import lombok.extern.slf4j.Slf4j;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
67
import org.springframework.boot.SpringApplication;
78
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
89
import org.springframework.boot.test.context.TestConfiguration;
@@ -16,8 +17,9 @@
1617
import reactor.kafka.sender.SenderOptions;
1718

1819
@TestConfiguration(proxyBeanMethods = false)
19-
@Slf4j
20-
public class TestBootKafkaReactorConsumerApplication {
20+
class TestBootKafkaReactorConsumerApplication {
21+
22+
private static final Logger log = LoggerFactory.getLogger(TestBootKafkaReactorConsumerApplication.class);
2123

2224
@Bean
2325
@ServiceConnection

0 commit comments

Comments
 (0)