-
Notifications
You must be signed in to change notification settings - Fork 21
Refactor KafkaListenerConfig to remove unused ObjectMapper and simplify product event handling #1217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Refactor KafkaListenerConfig to remove unused ObjectMapper and simplify product event handling #1217
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,7 +12,7 @@ Licensed under MIT License Copyright (c) 2022-2025 Raja Kolli. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.example.inventoryservice.services.ProductManageService; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.example.inventoryservice.utils.AppConstants; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import jakarta.validation.Valid; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.springframework.context.annotation.Configuration; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -34,15 +34,12 @@ class KafkaListenerConfig { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final InventoryOrderManageService orderManageService; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final ProductManageService productManageService; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private final ObjectMapper objectMapper; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| KafkaListenerConfig( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| InventoryOrderManageService orderManageService, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ProductManageService productManageService, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ObjectMapper objectMapper) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ProductManageService productManageService) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.orderManageService = orderManageService; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.productManageService = productManageService; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.objectMapper = objectMapper; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // retries if processing of event fails | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -60,9 +57,10 @@ public void onEvent(OrderDto orderDto) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void onSaveProductEvent(@Payload String productDto) throws JsonProcessingException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void onSaveProductEvent(@Payload @Valid ProductDto productDto) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throws JsonProcessingException { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.info("Received Product: {}", productDto); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| productManageService.manage(objectMapper.readValue(productDto, ProductDto.class)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| productManageService.manage(productDto); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
59
to
64
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Restore a conversion step (or add a JsonDeserializer) before switching to The listener still receives raw JSON strings from Kafka (the consumer config in this service uses the default Apply this diff if you opt to restore the manual mapping: @@
- private final ProductManageService productManageService;
+ private final ProductManageService productManageService;
+ private final ObjectMapper objectMapper;
@@
- InventoryOrderManageService orderManageService,
- ProductManageService productManageService) {
+ InventoryOrderManageService orderManageService,
+ ProductManageService productManageService,
+ ObjectMapper objectMapper) {
this.orderManageService = orderManageService;
this.productManageService = productManageService;
+ this.objectMapper = objectMapper;
@@
- public void onSaveProductEvent(@Payload @Valid ProductDto productDto)
+ public void onSaveProductEvent(@Payload String productDtoJson)
throws JsonProcessingException {
- log.info("Received Product: {}", productDto);
- productManageService.manage(productDto);
+ var productDto = objectMapper.readValue(productDtoJson, ProductDto.class);
+ log.info("Received Product: {}", productDto);
+ productManageService.manage(productDto);
}Based on learnings 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @coderabbitai, can you please wire up a Kafka consumer factory / message converter that materialises ProductDto instances before they reach this listener.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainI'll help you configure a Kafka consumer factory with proper JSON deserialization for 🏁 Script executed: Length of output: 4079 Let me check the inventory service's current Kafka configuration: 🏁 Script executed: Length of output: 1748 Perfect! I can see that the inventory service doesn't have any Kafka consumer configuration in its Here's the solution - you need to add Kafka consumer configuration to your 🏁 Script executed: Length of output: 789 Let me find the actual configuration file location for the inventory service: 🏁 Script executed: Length of output: 730 Now I can see the inventory service uses profile-specific properties files. Let me check their contents and provide the proper Kafka configuration: 🏁 Script executed: Length of output: 2658 @rajadilipkolli Perfect! I can see that the inventory service uses profile-specific properties files. To configure Kafka to properly deserialize Option 1: Create a base application.properties file (Recommended)Create ######## Kafka Configuration #########
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos,com.example.inventoryservice.model.payload
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializerOption 2: Add to existing application-local.propertiesAlternatively, you can add the same Kafka configuration to your existing Key Points:
For Testing:Also add the same configuration to With this configuration, your current The 🧠 Learnings used |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @DltHandler | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.