diff --git a/aop/java/pom.xml b/aop/java/pom.xml new file mode 100644 index 0000000..100d212 --- /dev/null +++ b/aop/java/pom.xml @@ -0,0 +1,99 @@ + + + 4.0.0 + + org.example + aop-cloud-multiBundleDemo + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + + com.rabbitmq + amqp-client + 5.18.0 + + + org.slf4j + slf4j-api + + + + + org.projectlombok + lombok + 1.18.32 + + + info.picocli + picocli + 4.7.0 + + + org.apache.logging.log4j + log4j-slf4j2-impl + 2.21.1 + + + com.fasterxml.jackson.core + jackson-core + 2.16.1 + + + com.fasterxml.jackson.core + jackson-annotations + 2.16.1 + + + com.fasterxml.jackson.core + jackson-databind + 2.16.1 + + + org.apache.commons + commons-lang3 + 3.13.0 + + + + + aop-multiBundleDemo + + + org.apache.maven.plugins + maven-assembly-plugin + 2.3 + + false + + jar-with-dependencies + + + + true + lib/ + org.example.RabbitMQDemo + + + + + + make-assembly + package + + assembly + + + + + + + + diff --git a/aop/java/src/main/java/org/example/RabbitMQApikeyConsumerDemo.java b/aop/java/src/main/java/org/example/RabbitMQApikeyConsumerDemo.java new file mode 100644 index 0000000..2e193f1 --- /dev/null +++ b/aop/java/src/main/java/org/example/RabbitMQApikeyConsumerDemo.java @@ -0,0 +1,117 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.example; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.LongString; +import com.rabbitmq.client.SaslMechanism; +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.DefaultCredentialsRefreshService; +import com.rabbitmq.client.impl.LongStringHelper; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** + * RabbitMQ messaging test. + */ +@Slf4j +public class RabbitMQApikeyConsumerDemo { + + private String apikeyHost = "host"; // Replace with your actual RabbitMQ host + private String port = "5671"; + private String virtualHost = "aoptest"; // please create the namespace public/aoptest, the namespace should only has one bundle + private String exchange = "ex-fanout"; + private String queue = "qu"; + private int msgCount = 100; + private DefaultCredentialsRefreshService refreshService; + + private Connection getConnection() throws Exception { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(apikeyHost); + connectionFactory.setPort(Integer.parseInt(port)); + connectionFactory.useSslProtocol(); + connectionFactory.setVirtualHost(virtualHost); + final String apikeyToken = "apikeyToken"; // Replace with your actual API key token + + connectionFactory.setCredentialsProvider(new CredentialsProvider() { + @Override + public String getUsername() { + return null; + } + + @Override + public String getPassword() { + return apikeyToken; + } + }); + connectionFactory.setSaslConfig(strings -> new SaslMechanism() { + @Override + public String getName() { + return "token"; + } + + @Override + public LongString handleChallenge(LongString longString, String s, String s1) { + return LongStringHelper.asLongString(s1); + } + }); + refreshService = new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder().build(); + connectionFactory.setCredentialsRefreshService(refreshService); + + return connectionFactory.newConnection(); + } + + public void start() throws Exception { + Connection connection = getConnection(); + log.info("get connection"); + var channel = connection.createChannel(); + String ex = exchange; + String qu = queue; + + channel.exchangeDeclare(ex, BuiltinExchangeType.FANOUT, true); + channel.queueDeclare(qu, true, false, false, null); + channel.queueBind(qu, ex, ""); + log.info("declared exchange and queue"); + + CountDownLatch countDownLatch = new CountDownLatch(msgCount); + channel.basicConsume(qu, true, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + log.info("Received message: {}", new String(body)); + countDownLatch.countDown(); + } + }); + countDownLatch.await(); + log.info("consume all messages"); + + channel.close(); + connection.close(); + refreshService.close(); + } + + public static void main(String[] args) throws Exception { + log.info("start consumer demo"); + RabbitMQApikeyConsumerDemo demo = new RabbitMQApikeyConsumerDemo(); + demo.start(); + log.info("finish consumer demo"); + } + +} diff --git a/aop/java/src/main/java/org/example/RabbitMQApikeyProducerDemo.java b/aop/java/src/main/java/org/example/RabbitMQApikeyProducerDemo.java new file mode 100644 index 0000000..93ec2b1 --- /dev/null +++ b/aop/java/src/main/java/org/example/RabbitMQApikeyProducerDemo.java @@ -0,0 +1,105 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.example; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.LongString; +import com.rabbitmq.client.SaslMechanism; +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.DefaultCredentialsRefreshService; +import com.rabbitmq.client.impl.LongStringHelper; +import lombok.extern.slf4j.Slf4j; + +/** + * RabbitMQ messaging test. + */ +@Slf4j +public class RabbitMQApikeyProducerDemo { + + private String apikeyHost = "host"; // Replace with your actual RabbitMQ host + private String port = "5671"; + private String virtualHost = "aoptest"; // please create the namespace public/aoptest, the namespace should only has one bundle + private String exchange = "ex-fanout"; + private String queue = "qu"; + private int msgCount = 100; + private DefaultCredentialsRefreshService refreshService; + + private Connection getConnection() throws Exception { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setHost(apikeyHost); + connectionFactory.setPort(Integer.parseInt(port)); + connectionFactory.useSslProtocol(); + connectionFactory.setVirtualHost(virtualHost); + final String apikeyToken = "apikeyToken"; // Replace with your actual API key token + + connectionFactory.setCredentialsProvider(new CredentialsProvider() { + @Override + public String getUsername() { + return null; + } + + @Override + public String getPassword() { + return apikeyToken; + } + }); + connectionFactory.setSaslConfig(strings -> new SaslMechanism() { + @Override + public String getName() { + return "token"; + } + + @Override + public LongString handleChallenge(LongString longString, String s, String s1) { + return LongStringHelper.asLongString(s1); + } + }); + refreshService = new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder().build(); + connectionFactory.setCredentialsRefreshService(refreshService); + + return connectionFactory.newConnection(); + } + + public void start() throws Exception { + Connection connection = getConnection(); + log.info("get connection"); + var channel = connection.createChannel(); + String ex = exchange; + String qu = queue; + + channel.exchangeDeclare(ex, BuiltinExchangeType.FANOUT, true); + channel.queueDeclare(qu, true, false, false, null); + channel.queueBind(qu, ex, ""); + log.info("declared exchange and queue"); + + for (int i = 0; i < msgCount; i++) { + channel.basicPublish(ex, "", null, ("msg " + i).getBytes()); + } + log.info("published {} messages", msgCount); + + channel.close(); + connection.close(); + refreshService.close(); + } + + public static void main(String[] args) throws Exception { + log.info("start producer demo"); + RabbitMQApikeyProducerDemo demo = new RabbitMQApikeyProducerDemo(); + demo.start(); + log.info("finish producer demo"); + } + +} diff --git a/aop/java/src/main/resources/log4j2.xml b/aop/java/src/main/resources/log4j2.xml new file mode 100644 index 0000000..db68a31 --- /dev/null +++ b/aop/java/src/main/resources/log4j2.xml @@ -0,0 +1,47 @@ + + + + + + + + + + + + + + + + + + + diff --git a/aop/python/pika/consume_demo.py b/aop/python/pika/consume_demo.py new file mode 100644 index 0000000..51dee77 --- /dev/null +++ b/aop/python/pika/consume_demo.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 + +import ssl + +import pika +from pika.credentials import PlainCredentials + +from token_credentials import TokenCredentials + + +def test_amqp_connection(): + """Simple AMQP connection test with amqp-test vhost""" + host = "host" + port = 5671 + vhost = "aoptest" # the namespace `public/aoptest` must be existing and the namespace bundle count must be 1 + + print(f"Testing AMQP connection to {host}:{port} with vhost: {vhost}") + + try: + token = 'TOKEN' + credentials = TokenCredentials(token) + pika.credentials.VALID_TYPES = [PlainCredentials, TokenCredentials] + + connection_params = pika.ConnectionParameters( + host=host, + port=port, + virtual_host=vhost, + credentials=credentials, # 在这里添加 credentials 参数 + ssl_options=pika.SSLOptions(ssl.create_default_context()), + socket_timeout=10 + ) + + connection = pika.BlockingConnection(connection_params) + channel = connection.channel() + print("✓ Connection successful") + + count = 0 + # Get ten messages and break out + for method_frame, properties, body in channel.consume('hello'): + + print(' [x] receive ', body) + channel.basic_ack(method_frame.delivery_tag) + + # Escape out of the loop after 10 messages + count += 1 + if count == 10: + break + + channel.close() + connection.close() + return True + + except Exception as e: + print(f"✗ Connection failed: {e}") + return False + + +if __name__ == "__main__": + test_amqp_connection() \ No newline at end of file diff --git a/aop/python/pika/produce_demo.py b/aop/python/pika/produce_demo.py new file mode 100644 index 0000000..48ab887 --- /dev/null +++ b/aop/python/pika/produce_demo.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 + +import ssl + +import pika +from pika import PlainCredentials + +from token_credentials import TokenCredentials + + +def test_amqp_connection(): + """Simple AMQP connection test with amqp-test vhost""" + host = "host" + port = 5671 + vhost = "aoptest" # the namespace `public/aoptest` must be existing and the namespace bundle count must be 1 + + print(f"Testing AMQP connection to {host}:{port} with vhost: {vhost}") + + try: + token = 'TOKEN' + + credentials = TokenCredentials(token) + pika.credentials.VALID_TYPES = [PlainCredentials, TokenCredentials] + + connection_params = pika.ConnectionParameters( + host=host, + port=port, + virtual_host=vhost, + credentials=credentials, # 在这里添加 credentials 参数 + ssl_options=pika.SSLOptions(ssl.create_default_context()), + socket_timeout=10 + ) + + connection = pika.BlockingConnection(connection_params) + channel = connection.channel() + print("✓ Connection successful") + + channel.queue_declare(queue='hello', durable=True) + for i in range(10): + channel.basic_publish( + exchange='', + routing_key='hello', + body='message - ' + str(i), + ) + print(" [x] Sent 'message'") + + channel.close() + connection.close() + return True + + except Exception as e: + print(f"✗ Connection failed: {e}") + return False + + +if __name__ == "__main__": + test_amqp_connection() \ No newline at end of file diff --git a/aop/python/pika/token_credentials.py b/aop/python/pika/token_credentials.py new file mode 100644 index 0000000..9cef0d7 --- /dev/null +++ b/aop/python/pika/token_credentials.py @@ -0,0 +1,27 @@ +class TokenCredentials: + """ + The custom token credential + """ + + # the mechanism name is token + TYPE = 'token' + + def __init__(self, token): + """ + set your token + """ + self._token = token + self.erase_on_connect = False + + def response_for(self, start): + if self.TYPE.encode('utf-8') not in start.mechanisms.split(): + return None, None + + # encode token + response_bytes = self._token.encode('utf-8') + + # return mechanism name and encoded token + return self.TYPE, response_bytes + + def erase_credentials(self): + pass