-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add Cloud Events support to Spring Integration #10448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
b377397
to
03d1f51
Compare
Introduces Cloud Events v1.0 specification support including message converters, transformers, and utilities. Key components added: - CloudEventMessageConverter for message format conversion - ToCloudEventTransformer for transforming messages to Cloud Events - MessageBinaryMessageReader/Writer for binary format handling - CloudEventProperties for configuration management - Header pattern matching utilities for flexible event mapping - Add reference docs and what's-new paragraph
03d1f51
to
02a1329
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some at a glance review.
Thank you!
} | ||
|
||
project('spring-integration-cloudevents') { | ||
description = 'Spring Integration Cloud Events Support' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CloudEvents
?
} | ||
optionalApi "org.apache.avro:avro:$avroVersion" | ||
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion" | ||
optionalApi 'org.jspecify:jspecify' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is something we need to specify manually.
It has to come from Spring Core.
exclude group: 'org.apache.avro', module: 'avro' | ||
} | ||
optionalApi "org.apache.avro:avro:$avroVersion" | ||
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these all optional
deps really necessary for our production code?
Maybe test
scope instead?
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion" | ||
optionalApi 'org.jspecify:jspecify' | ||
|
||
testImplementation 'org.springframework.amqp:spring-rabbit-test' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to test this stuff against RabbitMQ ?
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.cloudevents.v1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a specific v1
package.
How about just aim for v1 as is in the root package?
*/ | ||
public class CloudEventMessageConverter implements MessageConverter { | ||
|
||
private String cePrefix; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
|
||
@Override | ||
public Object fromMessage(Message<?> message, Class<?> targetClass) { | ||
Assert.state(CloudEvent.class.isAssignableFrom(targetClass), "Target class must be a CloudEvent"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just ignore the class and state that in the Javadocs.
import static org.assertj.core.api.Assertions.assertThatIllegalStateException; | ||
import static org.assertj.core.api.Assertions.catchIllegalStateException; | ||
|
||
public class CloudEventMessageConverterTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
..Tests
, please.
|
||
[[x7.0-cloudevents]] | ||
=== CloudEvents | ||
The CloudEvent transformer converts Spring Integration messages into CloudEvent compliant messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blank line after title.
=== CloudEvents | ||
The CloudEvent transformer converts Spring Integration messages into CloudEvent compliant messages. | ||
This transformer provides support for the CloudEvents specification v1.0 with configurable output formats, header pattern matching, and extension management. | ||
See xref:cloudevents/cloudevents-transform.adoc[] for more information. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it cannot be just top-level cloudevents.adoc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for a lengthy review and some doubts I've expressed.
I addition, what are your thoughts about content of this package in Spring Cloud Function: https://github.com/spring-cloud/spring-cloud-function/tree/main/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent ?
I mean transformer does the trick indeed, but only from an integration flow context.
How about the way to be able to construct CloudEvent
programmatically?
Or just existing SDK API is enough to deal with?
return CloudEventUtils.toReader((CloudEvent) payload).read(new MessageBuilderMessageWriter(headers, this.cePrefix)); | ||
} | ||
|
||
private MessageReader createMessageReader(Message<?> message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like all of methods starting with this has to be static
.
if (message.containsKey(CloudEventsHeaders.SPEC_VERSION)) { | ||
return message.get(CloudEventsHeaders.SPEC_VERSION).toString(); | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like @Nullability
is not a part of this package yet.
* | ||
*/ | ||
public class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> { | ||
private final String cePrefix; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every member of the class has to be surrounded with blank lines.
public ToCloudEventTransformerExtensions(MessageHeaders headers, @Nullable String patterns) { | ||
this.cloudEventExtensions = new HashMap<>(); | ||
headers.keySet().forEach(key -> { | ||
Boolean result = HeaderPatternMatcher.categorizeHeader(key, patterns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this logic here.
We can perform filtering (and categorization) only once in the transformer.
And then push into this extensions only those headers which are really extensions.
From there I don't think it makes sense to make this class as public
* | ||
* @since 7.0 | ||
*/ | ||
public final class HeaderPatternMatcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is fully redundant.
Make the pattern
property of the transformer as String @Nullable... patterns
private Message<String> convertToJsonMessage(CloudEvent cloudEvent, MessageHeaders originalHeaders) { | ||
JsonFormat jsonFormat = new JsonFormat(); | ||
String jsonContent = new String(jsonFormat.serialize(cloudEvent)); | ||
return buildStringMessage(jsonContent, originalHeaders, "application/json"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to convert to String
?
Why just byte[]
for serialized XML/JSON is not enough?
|
||
@Override | ||
public String getComponentType() { | ||
return "to-cloud-transformer"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is not in a core
module any more, the type has to come with prefix.
So, I believe this one should be like: ce:to-cloudevents-transformer
* | ||
* @since 7.0 | ||
*/ | ||
public class CloudEventProperties { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure in this class.
Why all these properties cannot be on the transformer itself?
More over: how about expressions against the request message to determine all these options at runtime?
I think I had expressions in my original PoC: #3246.
* | ||
* @since 7.0 | ||
*/ | ||
public final class CloudEventsHeaders { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very convinced that we need this class at all.
Especially with the case when we can change the prefix.
|
||
[[cloudevent-transformer-conversion-default]] | ||
==== DEFAULT | ||
The default format produces standard CloudEvent messages using Spring's CloudEvent support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blank like after section title.
Introduces Cloud Events v1.0 specification support including message converters, transformers, and utilities.
Key components added: