Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Licensed under MIT License Copyright (c) 2022-2025 Raja Kolli.
import com.example.inventoryservice.services.ProductManageService;
import com.example.inventoryservice.utils.AppConstants;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.Valid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
Expand All @@ -34,15 +34,12 @@ class KafkaListenerConfig {

private final InventoryOrderManageService orderManageService;
private final ProductManageService productManageService;
private final ObjectMapper objectMapper;

KafkaListenerConfig(
InventoryOrderManageService orderManageService,
ProductManageService productManageService,
ObjectMapper objectMapper) {
ProductManageService productManageService) {
this.orderManageService = orderManageService;
this.productManageService = productManageService;
this.objectMapper = objectMapper;
}

// retries if processing of event fails
Expand All @@ -60,9 +57,10 @@ public void onEvent(OrderDto orderDto) {
}

@KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product")
public void onSaveProductEvent(@Payload String productDto) throws JsonProcessingException {
public void onSaveProductEvent(@Payload @Valid ProductDto productDto)
throws JsonProcessingException {
log.info("Received Product: {}", productDto);
productManageService.manage(objectMapper.readValue(productDto, ProductDto.class));
productManageService.manage(productDto);
}
Comment on lines 59 to 64
Copy link
Contributor

@coderabbitai coderabbitai bot Sep 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Restore a conversion step (or add a JsonDeserializer) before switching to ProductDto.

The listener still receives raw JSON strings from Kafka (the consumer config in this service uses the default StringDeserializer, and this PR doesn’t introduce a JsonDeserializer or message converter). With the new signature Spring will try to hand a String payload to a ProductDto parameter and throw MessageConversionException, so every product event will now fail. Please either keep the string payload (and deserialize with ObjectMapper) or wire up a Kafka consumer factory / message converter that materialises ProductDto instances before they reach this listener.

Apply this diff if you opt to restore the manual mapping:

@@
-    private final ProductManageService productManageService;
+    private final ProductManageService productManageService;
+    private final ObjectMapper objectMapper;
@@
-            InventoryOrderManageService orderManageService,
-            ProductManageService productManageService) {
+            InventoryOrderManageService orderManageService,
+            ProductManageService productManageService,
+            ObjectMapper objectMapper) {
         this.orderManageService = orderManageService;
         this.productManageService = productManageService;
+        this.objectMapper = objectMapper;
@@
-    public void onSaveProductEvent(@Payload @Valid ProductDto productDto)
+    public void onSaveProductEvent(@Payload String productDtoJson)
             throws JsonProcessingException {
-        log.info("Received Product: {}", productDto);
-        productManageService.manage(productDto);
+        var productDto = objectMapper.readValue(productDtoJson, ProductDto.class);
+        log.info("Received Product: {}", productDto);
+        productManageService.manage(productDto);
     }

Based on learnings

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product")
public void onSaveProductEvent(@Payload String productDto) throws JsonProcessingException {
public void onSaveProductEvent(@Payload @Valid ProductDto productDto)
throws JsonProcessingException {
log.info("Received Product: {}", productDto);
productManageService.manage(objectMapper.readValue(productDto, ProductDto.class));
productManageService.manage(productDto);
}
// Restore manual JSON deserialization for backward compatibility
private final ProductManageService productManageService;
private final ObjectMapper objectMapper;
public KafkaListenerConfig(
InventoryOrderManageService orderManageService,
ProductManageService productManageService,
ObjectMapper objectMapper) {
this.orderManageService = orderManageService;
this.productManageService = productManageService;
this.objectMapper = objectMapper;
}
@KafkaListener(id = "products", topics = AppConstants.PRODUCT_TOPIC, groupId = "product")
public void onSaveProductEvent(@Payload String productDtoJson)
throws JsonProcessingException {
var productDto = objectMapper.readValue(productDtoJson, ProductDto.class);
log.info("Received Product: {}", productDto);
productManageService.manage(productDto);
}
🤖 Prompt for AI Agents
In
inventory-service/src/main/java/com/example/inventoryservice/config/KafkaListenerConfig.java
around lines 59-64, the listener signature was changed to accept ProductDto but
the consumer still uses StringDeserializer so Spring will receive raw JSON and
throw MessageConversionException; either revert the method to accept a String
and manually map to ProductDto using ObjectMapper (catch/handle
JsonProcessingException) before calling productManageService.manage(productDto),
or configure Kafka to deliver ProductDto by wiring a
ConsumerFactory/JsonDeserializer or a MessageConverter (configure
KafkaListenerContainerFactory with a RecordMessageConverter or set value
deserializer to JsonDeserializer<ProductDto> and trust packages), then restore
the ProductDto parameter so messages are deserialized before the listener.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai, can you please wire up a Kafka consumer factory / message converter that materialises ProductDto instances before they reach this listener.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

I'll help you configure a Kafka consumer factory with proper JSON deserialization for ProductDto. Let me first examine the existing Kafka configuration in your codebase.


🏁 Script executed:

#!/bin/bash
# Find existing Kafka configuration files
fd -e java -e yaml -e yml -e properties | xargs rg -l -i "kafka.*config|consumer.*factory|deserializer" | head -10

# Look for existing consumer configurations
rg -n -C3 "ConsumerFactory|KafkaListenerContainerFactory|JsonDeserializer" --type=java

# Check for existing Kafka properties or application configuration
find . -name "application*.yml" -o -name "application*.properties" -o -name "bootstrap*.yml" -o -name "bootstrap*.properties" | head -5 | xargs cat 2>/dev/null | grep -i kafka || echo "No Kafka properties found"

Length of output: 4079


Let me check the inventory service's current Kafka configuration:


🏁 Script executed:

#!/bin/bash
# Check inventory service's application properties
find inventory-service -name "application*.yml" -o -name "application*.properties" -o -name "bootstrap*.yml" -o -name "bootstrap*.properties" | xargs cat 2>/dev/null

# Look for any existing Kafka configuration classes in inventory service
fd -e java . inventory-service | xargs rg -l -i "kafka.*config|consumer.*factory" | head -5

# Check what other Kafka configurations exist in the project
rg -n -C5 "JsonDeserializer|ConsumerFactory" inventory-service --type=java

Length of output: 1748


Perfect! I can see that the inventory service doesn't have any Kafka consumer configuration in its application.properties, which means it's using the default StringDeserializer. Let me configure the Kafka consumer factory to use JsonDeserializer for ProductDto.

Here's the solution - you need to add Kafka consumer configuration to your inventory-service/src/main/resources/application.properties:


🏁 Script executed:

#!/bin/bash
# Check the current application.properties content and show where to add Kafka config
echo "=== Current inventory-service application.properties ==="
cat inventory-service/src/main/resources/application.properties | tail -10

echo -e "\n=== Kafka Configuration to Add ==="
cat << 'EOF'

######## Kafka Configuration  #########
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos,com.example.inventoryservice.model.payload
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
EOF

Length of output: 789


Let me find the actual configuration file location for the inventory service:


🏁 Script executed:

#!/bin/bash
# Find all configuration files in inventory service
echo "=== Finding configuration files in inventory-service ==="
find inventory-service -name "application*" -o -name "bootstrap*" | sort

# Check if there are any resources directories
echo -e "\n=== Resources directories in inventory-service ==="
find inventory-service -type d -name resources

# List contents of resources directories if they exist
echo -e "\n=== Contents of resources directories ==="
find inventory-service -type d -name resources -exec ls -la {} \; 2>/dev/null

Length of output: 730


Now I can see the inventory service uses profile-specific properties files. Let me check their contents and provide the proper Kafka configuration:


🏁 Script executed:

#!/bin/bash
# Check the contents of both properties files
echo "=== Contents of application-local.properties ==="
cat inventory-service/src/main/resources/application-local.properties

echo -e "\n=== Contents of application-test.properties ==="
cat inventory-service/src/test/resources/application-test.properties

# Also check what the earlier output was from (it might have been concatenated from multiple files)
echo -e "\n=== Checking if there are other config files we missed ==="
find . -name "*.properties" -path "*/inventory-service/*" -exec echo "File: {}" \; -exec head -5 {} \; -exec echo "" \;

Length of output: 2658


@rajadilipkolli Perfect! I can see that the inventory service uses profile-specific properties files. To configure Kafka to properly deserialize ProductDto instances, you have two options:

Option 1: Create a base application.properties file (Recommended)

Create inventory-service/src/main/resources/application.properties with the following Kafka configuration:

######## Kafka Configuration  #########
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.common.dtos,com.example.inventoryservice.model.payload
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Option 2: Add to existing application-local.properties

Alternatively, you can add the same Kafka configuration to your existing inventory-service/src/main/resources/application-local.properties file.

Key Points:

  1. JsonDeserializer: This replaces the default StringDeserializer and will automatically convert JSON payloads to ProductDto objects
  2. Trusted Packages: I've configured both com.example.common.dtos (for OrderDto) and com.example.inventoryservice.model.payload (for ProductDto) as trusted packages for security
  3. Consistency: This follows the same pattern used by your payment-service and order-service

For Testing:

Also add the same configuration to inventory-service/src/test/resources/application-test.properties to ensure your integration tests work correctly.

With this configuration, your current KafkaListenerConfig code will work perfectly - Spring Kafka will automatically deserialize the JSON payload to ProductDto before passing it to your onSaveProductEvent method, and the @Valid annotation will trigger validation as expected.

The JsonProcessingException in the method signature can actually be removed now since the deserialization happens at the framework level, but keeping it won't cause any issues if you prefer to leave it for backward compatibility.


🧠 Learnings used
Learnt from: CR
PR: rajadilipkolli/spring-boot-microservices-series-v2#0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-09-28T10:03:05.058Z
Learning: If event payloads change, update all affected Kafka consumers/producers and prefer backward-compatible migrations


@DltHandler
Expand Down
Loading