RabbitMQ对接指南
:::caution
仅支持本地DockerCompose版本
:::
本文介绍如何使用Java,完成对领域事件的RabbitMQ消息接收。
步骤一: 准备工作
1.在AIoT平台创建属于您的访问凭证,并获取对应的AccessKeyId和AccessKeySecret。
2.创建Maven项目工程,在pom.xml中引入rabbitmq客户端依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
步骤二: 接收消息
public class RabbitMqConsumerDemo {
public static void main(String[] args){
/* 1.定义连接基础信息 */
String rabbitMqHost = "xxxx"; // rabbitmq的host信息
int rabbitMqPort = 5672; // rabbitmq的端口号
String accessKeyId = "rq-demo-accessKeyId"; // 访问凭证里面的accessKeyId
String accessKeySecret = "rq-demo-accessKeySecret"; // 访问凭证里面的accessKeySecret
String vHost = "/"; // 不支持配置其他host
String openTopic = "rq-demo-topic";
String queue = openTopic + "/" + accessKeyId; // openTopic + accessKeyId
/* 2.连接MQ */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitMqHost);
factory.setPort(rabbitMqPort);
factory.setUsername(accessKeyId);
factory.setPassword(accessKeySecret);
factory.setVirtualHost(vHost);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
channel.basicQos(3);
/* 创建消费者,开始订阅消息 */
DefaultConsumer callback = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者获取到消息:" + new String(body, Charset.defaultCharset()));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queue,false, callback);
System.in.read();
} catch (IOException | TimeoutException e) {
System.out.println("创建消费者异常");
e.printStackTrace();
}
}
}
关于更多客户端的使用可以参考RabbitMq官网的使用。
:::caution
配置消费者的时候,一定要注意Queue的设置是:开放topic + 您的访问凭证AccessKeyId。
(比如: 开放topic是"rq-demo-topic",您的访问凭证是"rq-demo-accessKeyId",那么您的Queue应该为 "rq-demo-topic/rq-demo-accessKeyId")
:::
修改于 3 个月前