PulsarMQ对接指南
:::caution
仅支持本地K8S版本
:::
本文介绍如何使用Java SDK,完成对领域事件的PulsarMQ消息接收。
步骤一: 准备工作
1.下载Java SDK。
2.创建Maven工程,将SDK文件添加为依赖项导入Project Library,并且引入Pulsar的客户端依赖,pom.xml配置如下:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.10.2</version>
</dependency>
3.在AIoT平台创建属于您的访问凭证,并获取对应的AccessKeyId和AccessKeySecret。
:::tip
JAVA SDK版本说明:
Version:1.0.0
更新日期:
2024-06-25使用说明:
请使用JDK11及以上版本,否则可能会出现JDK版本兼容问题
:::
步骤二: 接收消息
public class PulsarConsumerDemo {
public static void main(String[] args) throws IOException {
/* 1.创建认证信息 */
String accessKeyId = "sdk-demo-accessKeyId"; // 访问凭证里面的accessKeyId
String accessKeySecret = "sdk-demo-accessKeySecret"; // 访问凭证里面的accessKeySecret
DasAuthentication dasAuth = new DasAuthentication();
Map<String, String> authMap = new HashMap<>();
authMap.put("appId", accessKeyId);
authMap.put("appSecret", accessKeySecret);
dasAuth.configure(authMap);
/*
也可以使用authParamString的方式如下:
dasAuth.configure("{\"appId\":\"sdk-demo-accessKeyId\",\"appSecret\":\"sdk-demo-accessKeySecret\"}");
*/
/* 2.连接pulsar,并且传入das认证 */
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://host:port")
.authentication(dasAuth)
.build();
/* 3.创建消费者,开始订阅消息 */
// 需要订阅的领域事件topic
String openTopic = "sdk-demo-topic";
// 消费者名称
String consumerName = "sdk-demo-consumerName";
// 订阅组名称
String subscriptionName = "sdk-demo-subscriptionName";
// 消费者消费模式
SubscriptionType subscriptionType = SubscriptionType.Shared;
try {
client.newConsumer().subscriptionType(subscriptionType)
.topic(openTopic)
.consumerName(consumerName)
.messageListener(new MessageListener<>() {
private static final long serialVersionUID = 1L;
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
System.out.println(consumer.getConsumerName() + " - " + System.currentTimeMillis() + " -接收到消息: "
+ new String(msg.getData(), Charset.defaultCharset()));
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
System.out.println("提交消息异常了");
e.printStackTrace();
}
}
}).subscriptionName(subscriptionName).subscribe();
System.in.read();
}catch (Exception e){
System.out.println("创建消费者异常了, customer = " + consumerName);
e.printStackTrace();
}
// 关闭连接 代码省略......
}
}
关于更多客户端的使用可以参考Pulsar官网的使用。
修改于 3 个月前