From 65433498372117a38c0ee8b0eead20eae6ba0f2f Mon Sep 17 00:00:00 2001 From: Minyoung Noh Date: Thu, 5 Jun 2025 11:03:45 +0900 Subject: [PATCH 1/4] Add MQTT5 sample application with integration flows and tests Signed-off-by: Minyoung Noh #323 --- README.md | 1 + basic/mqtt5/README.md | 29 ++++ .../samples/mqtt5/Application.java | 135 ++++++++++++++++++ .../mqtt5/MqttStringToBytesConverter.java | 51 +++++++ basic/mqtt5/src/main/resources/logback.xml | 24 ++++ .../samples/mqtt5/ApplicationTest.java | 60 ++++++++ .../samples/mqtt5/BrokerRunning.java | 86 +++++++++++ build.gradle | 30 ++++ 8 files changed, 416 insertions(+) create mode 100644 basic/mqtt5/README.md create mode 100644 basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java create mode 100644 basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java create mode 100644 basic/mqtt5/src/main/resources/logback.xml create mode 100644 basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java create mode 100644 basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java diff --git a/README.md b/README.md index ad055b217..86f4bd781 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ This is a good place to get started. The samples here are technically motivated * **jpa** - Shows the usage of the JPA Components * **mail** - Example showing **IMAP** and **POP3** support * **mqtt** - Demonstrates the functionality of inbound and outbound **MQTT Adapters** +* **mqtt5** - Demonstrates the functionality of inbound and outbound **MQTT5 Adapters** * **mongodb** - Shows how to persist a Message payload to a **MongoDb** document store and how to read documents from **MongoDb** * **oddeven** - Example combining the functionality of **Inbound Channel Adapter**, **Filter**, **Router** and **Poller** * **quote** - Example demoing core EIP support using **Channel Adapter (Inbound and Stdout)**, **Poller** with Interval Triggers, **Service Activator** diff --git a/basic/mqtt5/README.md b/basic/mqtt5/README.md new file mode 100644 index 000000000..bbe59f7f1 --- /dev/null +++ b/basic/mqtt5/README.md @@ -0,0 +1,29 @@ +Spring Integration - MQTT5 Sample +================================ + +# Overview + +This sample demonstrates basic functionality of the **Spring Integration MQTT5 Adapters**. + +It assumes a broker is running on localhost on port 1883. + +Once the application is started, you enter some text on the command prompt and a message containing that entered text is +dispatched to the MQTT topic. In return that message is retrieved by Spring Integration and then logged. + +# How to Run the Sample + +If you imported the example into your IDE, you can just run class **org.springframework.integration.samples.mqtt5.Application**. +For example in [SpringSource Tool Suite](https://www.springsource.com/developer/sts) (STS) do: + +* Right-click on SampleSimple class --> Run As --> Spring Boot App + +(or run from the boot console). + +Alternatively, you can start the sample from the command line: + +* ./gradlew :mqtt5:run + +Enter some data (e.g. `foo`) on the console; you will see `foo sent to MQTT5, received from MQTT5` + +Ctrl-C to terminate. + diff --git a/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java new file mode 100644 index 000000000..19f620991 --- /dev/null +++ b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java @@ -0,0 +1,135 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.mqtt5; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.handler.LoggingHandler; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.integration.stream.CharacterStreamReadingMessageSource; +import org.springframework.messaging.MessageHandler; + +import java.nio.charset.StandardCharsets; + +/** + * Starts the Spring Context and will initialize the Spring Integration message flow. + * + * @author Minyoung Noh + * + */ +@SpringBootApplication +public class Application { + + private static final Log LOGGER = LogFactory.getLog(Application.class); + + /** + * Load the Spring Integration Application Context + * + * @param args - command line arguments + */ + public static void main(final String... args) { + + LOGGER.info("\n=========================================================" + + "\n " + + "\n Welcome to Spring Integration! " + + "\n " + + "\n For more information please visit: " + + "\n https://spring.io/projects/spring-integration " + + "\n " + + "\n========================================================="); + + LOGGER.info("\n=========================================================" + + "\n " + + "\n This is the MQTT5 Sample - " + + "\n " + + "\n Please enter some text and press return. The entered " + + "\n Message will be sent to the configured MQTT topic, " + + "\n then again immediately retrieved from the Message " + + "\n Broker and ultimately printed to the command line. " + + "\n " + + "\n========================================================="); + + SpringApplication.run(Application.class, args); + } + + @Bean + public MqttConnectionOptions mqttConnectionOptions() { + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setServerURIs(new String[]{ "tcp://localhost:1883" }); + options.setUserName("guest"); + options.setPassword("guest".getBytes(StandardCharsets.UTF_8)); + return options; + } + + // publisher + + @Bean + public IntegrationFlow mqttOutFlow() { + return IntegrationFlow.from(CharacterStreamReadingMessageSource.stdin(), + e -> e.poller(Pollers.fixedDelay(1000))) + .transform(p -> p + " sent to MQTT5") + .handle(mqttOutbound()) + .get(); + } + + @Bean + public MessageHandler mqttOutbound() { + Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(mqttConnectionOptions(), "siSamplePublisher"); + messageHandler.setAsync(true); + messageHandler.setAsyncEvents(true); + messageHandler.setDefaultTopic("siSampleTopic"); + return messageHandler; + } + + // consumer + + @Bean + public IntegrationFlow mqttInFlow() { + return IntegrationFlow.from(mqttInbound()) + .transform(p -> p + ", received from MQTT5") + .handle(logger()) + .get(); + } + + private LoggingHandler logger() { + LoggingHandler loggingHandler = new LoggingHandler("INFO"); + loggingHandler.setLoggerName("siSample"); + return loggingHandler; + } + + @Bean + public MessageProducerSupport mqttInbound() { + Mqttv5PahoMessageDrivenChannelAdapter adapter = + new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectionOptions(),"siSampleConsumer", "siSampleTopic"); + adapter.setCompletionTimeout(5000); + adapter.setPayloadType(String.class); + adapter.setMessageConverter(new MqttStringToBytesConverter()); + adapter.setQos(1); + return adapter; + } + +} diff --git a/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java new file mode 100644 index 000000000..2b0353a1e --- /dev/null +++ b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/MqttStringToBytesConverter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.samples.mqtt5; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.AbstractMessageConverter; + +import java.nio.charset.StandardCharsets; + +/** + * A simple {@link AbstractMessageConverter} that converts + * + * @author Minyoung Noh + * @since 5.2 + * + */ +public class MqttStringToBytesConverter extends AbstractMessageConverter { + @Override + protected boolean supports(Class clazz) { + return true; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, + Object conversionHint) { + + return message.getPayload().toString().getBytes(StandardCharsets.UTF_8); + } + + @Override + protected Object convertToInternal(Object payload, MessageHeaders headers, + Object conversionHint) { + + return new String((byte[]) payload); + } + +} diff --git a/basic/mqtt5/src/main/resources/logback.xml b/basic/mqtt5/src/main/resources/logback.xml new file mode 100644 index 000000000..11bc14c60 --- /dev/null +++ b/basic/mqtt5/src/main/resources/logback.xml @@ -0,0 +1,24 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + diff --git a/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java new file mode 100644 index 000000000..47ec70cd0 --- /dev/null +++ b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java @@ -0,0 +1,60 @@ +package org.springframework.integration.samples.mqtt5; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.springframework.integration.test.mock.MockIntegration.messageArgumentCaptor; +import static org.springframework.integration.test.mock.MockIntegration.mockMessageHandler; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.test.context.MockIntegrationContext; +import org.springframework.integration.test.context.SpringIntegrationTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest +@SpringIntegrationTest +public class ApplicationTest { + + @BeforeAll + static void setupBroker() { + BrokerRunning brokerRunning = BrokerRunning.isRunning(1883); + } + + @Autowired + private MockIntegrationContext mockIntegrationContext; + + @Autowired + private IntegrationFlow mqttOutFlow; + + @Test + void testMqttFlow() throws InterruptedException { + ArgumentCaptor> captor = messageArgumentCaptor(); + CountDownLatch receiveLatch = new CountDownLatch(1); + MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown()); + + mockIntegrationContext.substituteMessageHandlerFor( + "mqttInFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1", + mockMessageHandler); + + mqttOutFlow.getInputChannel().send(new GenericMessage<>("foo")); + + assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue(); + verify(mockMessageHandler).handleMessage(any()); + assertThat(captor.getValue().getPayload()) + .isEqualTo("foo sent to MQTT5, received from MQTT5"); + } +} diff --git a/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java new file mode 100644 index 000000000..fe7757a51 --- /dev/null +++ b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/BrokerRunning.java @@ -0,0 +1,86 @@ +/* + * Copyright 2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.mqtt5; + +import static org.junit.Assume.assumeNoException; +import static org.junit.Assume.assumeTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.paho.mqttv5.client.IMqttClient; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + + +/** + * @author Minyoung Noh + * + * @since 5.2 + * + */ +public class BrokerRunning extends TestWatcher { + + private static final Log logger = LogFactory.getLog(BrokerRunning.class); + + // Static so that we only test once on failure: speeds up test suite + private static final Map brokerOnline = new HashMap<>(); + + private final int port; + + private BrokerRunning(int port) { + this.port = port; + brokerOnline.put(port, true); + } + + @Override + public Statement apply(Statement base, Description description) { + assumeTrue(brokerOnline.get(port)); + String url = "tcp://localhost:" + port; + IMqttClient client = null; + try { + client = new MqttClient(url, "junit-" + System.currentTimeMillis()); + client.connect(); + } + catch (MqttException e) { + logger.warn("Tests not running because no broker on " + url + ":", e); + assumeNoException(e); + } + finally { + if (client != null) { + try { + client.disconnect(); + client.close(); + } + catch (MqttException e) { + } + } + } + return super.apply(base, description); + } + + + public static BrokerRunning isRunning(int port) { + return new BrokerRunning(port); + } + +} diff --git a/build.gradle b/build.gradle index 268a83eaf..bf35cc280 100644 --- a/build.gradle +++ b/build.gradle @@ -692,6 +692,36 @@ project('mqtt') { } +project('mqtt5') { + description = 'MQTT5 Basic Sample' + + apply plugin: 'org.springframework.boot' + + dependencies { + api 'org.springframework.boot:spring-boot-starter-integration' + api 'org.springframework.integration:spring-integration-stream' + api 'org.springframework.integration:spring-integration-mqtt' + api 'org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5' + + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation "org.springframework.integration:spring-integration-test" + } + + springBoot { + mainClass = 'org.springframework.integration.samples.mqtt5.Application' + } + + task run(type: JavaExec) { + main 'org.springframework.integration.samples.mqtt5.Application' + classpath = sourceSets.main.runtimeClasspath + } + + tasks.withType(JavaExec) { + standardInput = System.in + } + +} + project('si4demo') { description = 'Java Configuration/DSL Sample' From e29ae9cb22778fdaf803073ba4ba19e0af30656f Mon Sep 17 00:00:00 2001 From: nmy6452 Date: Tue, 10 Jun 2025 22:42:28 +0900 Subject: [PATCH 2/4] FEAT: delete useless logback.xml Signed-off-by: nmy6452 --- basic/mqtt5/src/main/resources/logback.xml | 24 ---------------------- 1 file changed, 24 deletions(-) delete mode 100644 basic/mqtt5/src/main/resources/logback.xml diff --git a/basic/mqtt5/src/main/resources/logback.xml b/basic/mqtt5/src/main/resources/logback.xml deleted file mode 100644 index 11bc14c60..000000000 --- a/basic/mqtt5/src/main/resources/logback.xml +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - - - - - - - - - - From 2af9a072b52de908cb4fdd04f1d3a90c7f764f1a Mon Sep 17 00:00:00 2001 From: nmy6452 Date: Tue, 10 Jun 2025 23:09:33 +0900 Subject: [PATCH 3/4] TEST: update assertions in ApplicationTest to use JUnit assertions Signed-off-by: nmy6452 --- .../integration/samples/mqtt5/ApplicationTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java index 47ec70cd0..890c092a3 100644 --- a/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java +++ b/basic/mqtt5/src/test/java/org/springframework/integration/samples/mqtt5/ApplicationTest.java @@ -1,6 +1,7 @@ package org.springframework.integration.samples.mqtt5; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.springframework.integration.test.mock.MockIntegration.messageArgumentCaptor; @@ -52,9 +53,8 @@ void testMqttFlow() throws InterruptedException { mqttOutFlow.getInputChannel().send(new GenericMessage<>("foo")); - assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertTrue(receiveLatch.await(10, TimeUnit.SECONDS)); verify(mockMessageHandler).handleMessage(any()); - assertThat(captor.getValue().getPayload()) - .isEqualTo("foo sent to MQTT5, received from MQTT5"); + assertEquals("foo sent to MQTT5, received from MQTT5",captor.getValue().getPayload()); } } From 06bf9e9a9c11315381728339d7d998a0c159db08 Mon Sep 17 00:00:00 2001 From: nmy6452 Date: Tue, 10 Jun 2025 23:13:43 +0900 Subject: [PATCH 4/4] FEAT: configure MQTT broker host and port using application properties Signed-off-by: nmy6452 --- .../integration/samples/mqtt5/Application.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java index 19f620991..f496fcd73 100644 --- a/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java +++ b/basic/mqtt5/src/main/java/org/springframework/integration/samples/mqtt5/Application.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -44,6 +45,12 @@ @SpringBootApplication public class Application { + @Value("${mqtt.broker.host:localhost}") + private String mqttHost; + + @Value("${mqtt.broker.port:1883}") + private int mqttPort; + private static final Log LOGGER = LogFactory.getLog(Application.class); /** @@ -79,7 +86,7 @@ public static void main(final String... args) { @Bean public MqttConnectionOptions mqttConnectionOptions() { MqttConnectionOptions options = new MqttConnectionOptions(); - options.setServerURIs(new String[]{ "tcp://localhost:1883" }); + options.setServerURIs(new String[]{ String.format("tcp://%s:%d", mqttHost, mqttPort) }); options.setUserName("guest"); options.setPassword("guest".getBytes(StandardCharsets.UTF_8)); return options;