Skip to content

Commit 03d1f51

Browse files
committed
Add Cloud Events support to Spring Integration
Introduces Cloud Events v1.0 specification support including message converters, transformers, and utilities. Key components added: - CloudEventMessageConverter for message format conversion - ToCloudEventTransformer for transforming messages to Cloud Events - MessageBinaryMessageReader/Writer for binary format handling - CloudEventProperties for configuration management - Header pattern matching utilities for flexible event mapping - Add reference docs and what's-new paragraph
1 parent c066338 commit 03d1f51

19 files changed

+2803
-0
lines changed

build.gradle

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ ext {
5353
avroVersion = '1.12.0'
5454
awaitilityVersion = '4.3.0'
5555
camelVersion = '4.14.0'
56+
cloudEventsVersion = '4.0.1'
5657
commonsDbcp2Version = '2.13.0'
5758
commonsIoVersion = '2.20.0'
5859
commonsNetVersion = '3.12.0'
@@ -472,6 +473,24 @@ project('spring-integration-cassandra') {
472473
}
473474
}
474475

476+
project('spring-integration-cloudevents') {
477+
description = 'Spring Integration Cloud Events Support'
478+
479+
dependencies {
480+
api "io.cloudevents:cloudevents-core:$cloudEventsVersion"
481+
optionalApi "io.cloudevents:cloudevents-json-jackson:$cloudEventsVersion"
482+
483+
optionalApi("io.cloudevents:cloudevents-avro-compact:$cloudEventsVersion") {
484+
exclude group: 'org.apache.avro', module: 'avro'
485+
}
486+
optionalApi "org.apache.avro:avro:$avroVersion"
487+
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion"
488+
optionalApi 'org.jspecify:jspecify'
489+
490+
testImplementation 'org.springframework.amqp:spring-rabbit-test'
491+
}
492+
}
493+
475494
project('spring-integration-core') {
476495
description = 'Spring Integration Core'
477496

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cloudevents.v1;
18+
19+
import java.nio.charset.StandardCharsets;
20+
21+
import io.cloudevents.CloudEvent;
22+
import io.cloudevents.SpecVersion;
23+
import io.cloudevents.core.CloudEventUtils;
24+
import io.cloudevents.core.format.EventFormat;
25+
import io.cloudevents.core.message.MessageReader;
26+
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
27+
import io.cloudevents.core.message.impl.MessageUtils;
28+
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.MessageHeaders;
31+
import org.springframework.messaging.converter.MessageConverter;
32+
import org.springframework.util.Assert;
33+
34+
/**
35+
* A {@link MessageConverter} that can translate to and from a {@link Message
36+
* Message<byte[]>} or {@link Message Message<String>} and a {@link CloudEvent}.
37+
*
38+
* @author Dave Syer
39+
* @author Glenn Renfro
40+
*
41+
* @since 7.0
42+
*/
43+
public class CloudEventMessageConverter implements MessageConverter {
44+
45+
private String cePrefix;
46+
47+
public CloudEventMessageConverter(String cePrefix) {
48+
this.cePrefix = cePrefix;
49+
}
50+
51+
public CloudEventMessageConverter() {
52+
this(CloudEventsHeaders.CE_PREFIX);
53+
}
54+
55+
@Override
56+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
57+
Assert.state(CloudEvent.class.isAssignableFrom(targetClass), "Target class must be a CloudEvent");
58+
return createMessageReader(message).toEvent();
59+
}
60+
61+
@Override
62+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
63+
Assert.state(payload instanceof CloudEvent, "Payload must be a CloudEvent");
64+
return CloudEventUtils.toReader((CloudEvent) payload).read(new MessageBuilderMessageWriter(headers, this.cePrefix));
65+
}
66+
67+
private MessageReader createMessageReader(Message<?> message) {
68+
return MessageUtils.parseStructuredOrBinaryMessage(//
69+
() -> contentType(message.getHeaders()), //
70+
format -> structuredMessageReader(message, format), //
71+
() -> version(message.getHeaders()), //
72+
version -> binaryMessageReader(message, version) //
73+
);
74+
}
75+
76+
private String version(MessageHeaders message) {
77+
if (message.containsKey(CloudEventsHeaders.SPEC_VERSION)) {
78+
return message.get(CloudEventsHeaders.SPEC_VERSION).toString();
79+
}
80+
return null;
81+
}
82+
83+
private MessageReader binaryMessageReader(Message<?> message, SpecVersion version) {
84+
return new MessageBinaryMessageReader(version, message.getHeaders(), getBinaryData(message), this.cePrefix);
85+
}
86+
87+
private MessageReader structuredMessageReader(Message<?> message, EventFormat format) {
88+
return new GenericStructuredMessageReader(format, getBinaryData(message));
89+
}
90+
91+
private String contentType(MessageHeaders message) {
92+
if (message.containsKey(MessageHeaders.CONTENT_TYPE)) {
93+
return message.get(MessageHeaders.CONTENT_TYPE).toString();
94+
}
95+
if (message.containsKey(CloudEventsHeaders.CONTENT_TYPE)) {
96+
return message.get(CloudEventsHeaders.CONTENT_TYPE).toString();
97+
}
98+
return null;
99+
}
100+
101+
private byte[] getBinaryData(Message<?> message) {
102+
Object payload = message.getPayload();
103+
if (payload instanceof byte[] bytePayload) {
104+
return bytePayload;
105+
}
106+
else if (payload instanceof String stringPayload) {
107+
return stringPayload.getBytes(StandardCharsets.UTF_8);
108+
}
109+
throw new IllegalStateException("Message payload must be a byte array or a String");
110+
}
111+
112+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cloudevents.v1;
18+
19+
/**
20+
* Constants for Cloud Events header names.
21+
*
22+
* @author Glenn Renfro
23+
*
24+
* @since 7.0
25+
*/
26+
public final class CloudEventsHeaders {
27+
28+
public static final String CE_PREFIX = "ce-";
29+
30+
public static final String SPEC_VERSION = CE_PREFIX + "specversion";
31+
32+
public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype";
33+
34+
private CloudEventsHeaders() {
35+
36+
}
37+
38+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cloudevents.v1;
18+
19+
import java.util.Map;
20+
import java.util.function.BiConsumer;
21+
22+
import io.cloudevents.SpecVersion;
23+
import io.cloudevents.core.data.BytesCloudEventData;
24+
import io.cloudevents.core.impl.StringUtils;
25+
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
26+
27+
/**
28+
* Utility for converting maps (message headers) to `CloudEvent` contexts.
29+
*
30+
* @author Dave Syer
31+
* @author Glenn Renfro
32+
*
33+
* @since 7.0
34+
*
35+
*/
36+
public class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
37+
private final String cePrefix;
38+
39+
private final Map<String, Object> headers;
40+
41+
public MessageBinaryMessageReader(SpecVersion version, Map<String, Object> headers, byte[] payload, String cePrefix) {
42+
super(version, payload == null ? null : BytesCloudEventData.wrap(payload));
43+
this.headers = headers;
44+
this.cePrefix = cePrefix;
45+
}
46+
47+
public MessageBinaryMessageReader(SpecVersion version, Map<String, Object> headers, String cePrefix) {
48+
this(version, headers, null, cePrefix);
49+
}
50+
51+
@Override
52+
protected boolean isContentTypeHeader(String key) {
53+
return org.springframework.messaging.MessageHeaders.CONTENT_TYPE.equalsIgnoreCase(key);
54+
}
55+
56+
@Override
57+
protected boolean isCloudEventsHeader(String key) {
58+
return key != null && key.length() > this.cePrefix.length() && StringUtils.startsWithIgnoreCase(key, this.cePrefix);
59+
}
60+
61+
@Override
62+
protected String toCloudEventsKey(String key) {
63+
return key.substring(this.cePrefix.length()).toLowerCase();
64+
}
65+
66+
@Override
67+
protected void forEachHeader(BiConsumer<String, Object> fn) {
68+
this.headers.forEach((k, v) -> fn.accept(k, v));
69+
}
70+
71+
@Override
72+
protected String toCloudEventsValue(Object value) {
73+
return value.toString();
74+
}
75+
76+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.cloudevents.v1;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import io.cloudevents.CloudEventData;
23+
import io.cloudevents.SpecVersion;
24+
import io.cloudevents.core.format.EventFormat;
25+
import io.cloudevents.core.message.MessageWriter;
26+
import io.cloudevents.rw.CloudEventContextWriter;
27+
import io.cloudevents.rw.CloudEventRWException;
28+
import io.cloudevents.rw.CloudEventWriter;
29+
30+
import org.springframework.messaging.Message;
31+
import org.springframework.messaging.support.MessageBuilder;
32+
33+
/**
34+
* Internal utility class for copying <code>CloudEvent</code> context to a map (message
35+
* headers).
36+
*
37+
* @author Dave Syer
38+
* @author Glenn Renfro
39+
*
40+
* @since 7.0
41+
*/
42+
public class MessageBuilderMessageWriter
43+
implements CloudEventWriter<Message<byte[]>>, MessageWriter<MessageBuilderMessageWriter, Message<byte[]>> {
44+
45+
private final String cePrefix;
46+
47+
private final Map<String, Object> headers = new HashMap<>();
48+
49+
public MessageBuilderMessageWriter(Map<String, Object> headers, String cePrefix) {
50+
this.headers.putAll(headers);
51+
this.cePrefix = cePrefix;
52+
}
53+
54+
public MessageBuilderMessageWriter() {
55+
this.cePrefix = CloudEventsHeaders.CE_PREFIX;
56+
}
57+
58+
@Override
59+
public Message<byte[]> setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
60+
this.headers.put(CloudEventsHeaders.CONTENT_TYPE, format.serializedContentType());
61+
return MessageBuilder.withPayload(value).copyHeaders(this.headers).build();
62+
}
63+
64+
@Override
65+
public Message<byte[]> end(CloudEventData value) throws CloudEventRWException {
66+
return MessageBuilder.withPayload(value == null ? new byte[0] : value.toBytes()).copyHeaders(this.headers).build();
67+
}
68+
69+
@Override
70+
public Message<byte[]> end() {
71+
return MessageBuilder.withPayload(new byte[0]).copyHeaders(this.headers).build();
72+
}
73+
74+
@Override
75+
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
76+
this.headers.put(this.cePrefix + name, value);
77+
return this;
78+
}
79+
80+
@Override
81+
public MessageBuilderMessageWriter create(SpecVersion version) {
82+
this.headers.put(this.cePrefix + "specversion", version.toString());
83+
return this;
84+
}
85+
86+
}

0 commit comments

Comments
 (0)