官方API实战
实战之前,我们需要先搭建一个基于Maven的Springboot项目,只需要加入以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.0</version>
</dependency>
接下来使用IDEA搭建一个Maven项目:
工程创建后,我们添加Pom依赖:
这样工程就搭建完成了,接下来我们进入API实战。
5.1.基本样例
消息生产者分别通过三种方式发送消息:
- 同步发送:等待消息返回后再继续进行下面的操作。
- 异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
- 单向发送:只负责发送,不管消息是否发送成功。
package Simple;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 同步发送
* Created by BaiLi
*/
public class SyncProducer {
public static void main(string[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Simple", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
package Simple;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 异步发送
* Created by BaiLi
*/
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
CountDownLatch countDownLatch = new CountDownLatch(100);//计数
for (int i = 0; i < 100; i++) {
Message message = new Message("Simple", "TagA", "Simple-Async".getBytes(StandardCharsets.UTF_8));
final int index = i;
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%d 消息发送成功%s%n", index, sendResult);
}
@Override
public void onException(Throwable throwable) {
countDownLatch.countDown();
System.out.printf("%d 消息失败%s%n", index, throwable);
throwable.printStackTrace();
}
}
);
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
package Simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 单向发送
* Created by BaiLi
*/
public class OnewayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("Simple","TagA", "Simple-Oneway".getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
System.out.printf("%d 消息发送完成 %n" , i);
}
Thread.sleep(5000);
producer.shutdown();
}
}
消费者消费消息分两种:
- 拉模式:消费者主动去Broker上拉取消息。
- 推模式:消费者等待Broker把消息推送过来。
package Simple;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.HashSet;
import java.util.Set;
/**
* 拉模式
* Created by BaiLi
*/
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("SimplePullConsumer");
pullConsumer.setNamesrvAddr("192.168.43.137:9876");//执行nameserver地址
Set<String> topics = new HashSet<>();
topics.add("Simple");//添加Topic
topics.add("TopicTest");
pullConsumer.setRegisterTopics(topics);
pullConsumer.start();
while (true) { //循环拉取消息
pullConsumer.getRegisterTopics().forEach(n -> {
try {
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(n);//获取主题中的Queue
messageQueues.forEach(l -> {
try {
//获取Queue中的偏移量
long offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_MEMORY);
if (offset < 0) {
offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0) {
offset = pullConsumer.maxOffset(l);
}
if (offset < 0) {
offset = 0;
}
//拉取Queue中的消息。每次获取32条
PullResult pullResult = pullConsumer.pull(l, "*", offset, 32);
System.out.printf("循环拉取消息ing %s%n",pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
pullResult.getMsgFoundList().forEach(p -> {
System.out.printf("拉取消息成功%s%n", p);
});
//更新偏移量
pullConsumer.updateConsumeOffset(l, pullResult.getNextBeginOffset());
}
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (MQClientException e) {
e.printStackTrace();
}
});
}
}
}
package Simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 推模式
* Created by BaiLi
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("192.168.43.137:9876");
pushConsumer.subscribe("Simple","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , n);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Consumer Started.%n");
}
}
通常情况下,用推模式比较简单。需要注意DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
- LitePullConsumerSubscribe:随机获取一个queue消息
- LitePullConsumerAssign:指定一个queue消息
package Simple;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* Created by BaiLi
*/
public class PullLiteConsumer {
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("SimpleLitePullConsumer");
litePullConsumer.setNamesrvAddr("192.168.43.137:9876");
litePullConsumer.subscribe("Simple");
litePullConsumer.start();
while (true) {
List<MessageExt> poll = litePullConsumer.poll();
System.out.printf("消息拉取成功 %s%n" , poll);
}
}
}
package Simple;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* 指定获取messageQueue消息
* Created by BaiLi
*/
public class PullLiteConsumerAssign {
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("SimpleLitePullConsumer");
litePullConsumer.setNamesrvAddr("192.168.43.137:9876");
litePullConsumer.start();
Collection<MessageQueue> messageQueues = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(messageQueues);
litePullConsumer.assign(list);
litePullConsumer.seek(list.get(0), 10);
try {
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
生产者:
同步发送:org.apache.rocketmq.example.simple.Producer
异步发送:org.apache.rocketmq.example.simple.AsyncProducer
单向发送:org.apache.rocketmq.example.simple.OnewayProducer
消费者:
拉模式:org.apache.rocketmq.example.simple.PullConsumer
推模式:org.apache.rocketmq.example.simple.PushConsumer
拉模式(随机获取一个queue):org.apache.rocketmq.example.simple.LitePullConsumerSubscribe.java
拉模式(指定获取一个queue):org.apache.rocketmq.example.simple.LitePullConsumerAssign.java
5.2.顺序消息
顺序消息指生产者局部有序发送到一个queue,但多个queue之间是全局无序的。
- 顺序消息生产者样例:通过MessageQueueSelector将消息有序发送到同一个queue中。
- 顺序消息消费者样例:通过MessageListenerOrderly消费者每次读取消息都只从一个queue中获取(通过加锁的方式实现)。
package Order;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* 顺序消息生产者
* Created by BaiLi
*/
public class OrderProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
Message message = new Message("OrderTopic","TagA",
("order_" + j + "_step_" + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
}, j);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
}
}
package Order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 顺序消息消费者
* Created by BaiLi
*/
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
consumer.setNamesrvAddr("192.168.43.137:9876");
consumer.subscribe("OrderTopic","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.forEach(n->{
System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
生产者:
org.apache.rocketmq.example.order.Producer
消费者:
org.apache.rocketmq.example.order.Consumer
5.3.广播消息
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。
- MessageModel.BROADCASTING:广播消息。一条消息会发给所有订阅了对应主题的消费者,不管消费者是不是同一个消费者组。
- MessageModel.CLUSTERING:集群消息。每一条消息只会被同一个消费者组中的一个实例消费。
package broadcast;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 广播消息消费模式
* Created by BaiLi
*/
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadCastConsumer");
consumer.setNamesrvAddr("192.168.43.137:9876");
consumer.subscribe("simple","*");
consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式
// consumer.setMessageModel(MessageModel.CLUSTERING);//集群模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(n->{
System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
消费者样例:
org.apache.rocketmq.example.broadcast.PushConsumer
5.4.延迟消息
延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。
- message.setDelayTimeLevel(3):预定日常定时发送。1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
- msg.setDelayTimeMs(10L):指定时间定时发送。默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec修改。
package schedule;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
/**
* 预定日程定时发送
* Created by BaiLi
*/
public class ScheduleProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Schedule", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"ScheduleProducer".getBytes(StandardCharsets.UTF_8) //消息体。
);
//1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);
producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", LocalTime.now());
}
producer.shutdown();
}
}
package schedule;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
/**
* 预定日程消费者
* Created by BaiLi
*/
public class ScheduleConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("192.168.43.137:9876");
pushConsumer.subscribe("Schedule","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("接收时间:%s %n", LocalTime.now());
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Simple Consumer Started.%n");
}
}
package schedule;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.time.LocalTime;
/**
* 指定时间发送
* 默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec 修改
* Created by BaiLi
*/
public class TimeProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("TimeProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Schedule", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"TimeProducer".getBytes(StandardCharsets.UTF_8) //消息体。
);
// 相对时间:延时消息。此消息将在 10 秒后传递给消费者。
msg.setDelayTimeMs(10000L);
// 绝对时间:定时消息。设置一个具体的时间,然后在这个时间之后多久在进行发送消息
// msg.setDeliverTimeMs(System.currentTimeMillis() + 10000L);
producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", LocalTime.now());
}
producer.shutdown();
}
}
package schedule;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.time.LocalTime;
import java.util.List;
/**
* 定时发送消费者
* Created by BaiLi
*/
public class TimeConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("TimeConsumer");
pushConsumer.setNamesrvAddr("192.168.43.137:9876");
pushConsumer.subscribe("Schedule","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("接收时间:%s %n", LocalTime.now());
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Simple Consumer Started.%n");
}
}
生产者:
预定日期发送:org.apache.rocketmq.example.schedule.ScheduledMessageProducer.java
指定时间发送:org.apache.rocketmq.example.schedule.TimerMessageProducer.java
消费者:
预定日期消费:org.apache.rocketmq.example.schedule.ScheduledMessageConsumer.java
指定时间消费:org.apache.rocketmq.example.schedule.TimerMessageConsumer.java
5.5.批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
批量消息的使用限制:
- 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
- 相同的Topic,
- 相同的waitStoreMsgOK
- 不能是延迟消息、事务消息等
package batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
/**
* 批量发送消息
* Created by BaiLi
*/
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
ArrayList<Message> messages = new ArrayList<>();
messages.add(new Message("simple","TagA", "BatchProducer0".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message("simple","TagA", "BatchProducer1".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message("simple","TagA", "BatchProducer2".getBytes(StandardCharsets.UTF_8)));
SendResult send = producer.send(messages);
System.out.printf(".发送消息成功:%s%n", send);
producer.shutdown();
}
}
package batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 分批批量发送消息
* 注意修改SIZE_LIMIT为 = 10 * 1000,不然发送消息时会提示消息体积过大
* Created by BaiLi
*/
public class SplitBatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SplitBatchProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
ArrayList<Message> messages = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
messages.add(new Message("simple","TagA", ("SplitBatchProducer"+i).getBytes(StandardCharsets.UTF_8)));
}
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem);
System.out.printf(".发送消息成功:%s%n", sendResult);
}
producer.shutdown();
}
}
class ListSplitter implements Iterator<List<Message>> {
private static final int SIZE_LIMIT = 10 * 1000; // 每个消息批次的最大大小
private final List<Message> messages; // 待发送的消息列表
private int currIndex; // 当前拆分到的位置
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20;
// 如果超过了单个批次所允许的大小,就将此消息之前的消息作为下一个子列表返回
if (tmpSize > SIZE_LIMIT) {
// 如果是第一条消息就超出大小限制,就跳过这条消息再继续扫描
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
// 如果当前子列表大小已经超出所允许的单个批次大小,那么就暂停添加消息
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
// 返回从currIndex到nextIndex之间的所有消息
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
生产者:
org.apache.rocketmq.example.batch.SimpleBatchProducer
org.apache.rocketmq.example.batch.SplitBatchProducer
5.6.过滤消息
在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。
5.6.1.使用Tag方式过滤(通过consumer.subscribe("TagFilterTest", "TagA || TagC")实现):
package filter;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 过滤消息-tag过滤生产者
* Created by BaiLi
*/
public class TagFilterProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("FilterTopic", //主题
tags[i % tags.length], //设置消息Tag,用于消费端根据指定Tag过滤消息。
("TagFilterProducer_"+tags[i % tags.length]).getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
package filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 过滤消息-tag过滤消费者
* Created by BaiLi
*/
public class TagFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("192.168.43.137:9876");
pushConsumer.subscribe("FilterTopic","TagA || TagC");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , new String(n.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("TagFilter Consumer Started.%n");
}
}
生产者:
org.apache.rocketmq.example.filter.TagFilterProducer
消费者:
org.apache.rocketmq.example.filter.TagFilterConsumer
Tag是RocketMQ中特有的一个消息属性。
RocketMQ的最佳实践中就建议使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用Tag来区分。
Tag方式有一个很大的限制,就是一个消息只能有一个Tag,这在一些比较复杂的场景就有点不足了。 这时候可以使用SQL表达式来对消息进行过滤。
5.6.2.使用Sql方式过滤(通过MessageSelector.bySql(String sql)参数实现):
这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的自定义属性。
package filter;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
/**
* 过滤消息-SQL过滤生产者
* Created by BaiLi
*/
public class SqlFilterProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("FilterTopic", //主题
tags[i % tags.length], //设置消息Tag,用于消费端根据指定Tag过滤消息。
("TagFilterProducer_"+tags[i % tags.length] + "_i_" + i).getBytes(StandardCharsets.UTF_8) //消息体。
);
msg.putUserProperty("baiLi", String.valueOf(i));
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}
package filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 过滤消息-SQL过滤消费者
* Created by BaiLi
*/
public class SqlFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("192.168.43.137:9876");
pushConsumer.subscribe("FilterTopic", MessageSelector.bySql("(TAGS is not null And TAGS IN ('TagA','TagC'))"
+ "and (baiLi is not null and baiLi between 0 and 3)"));
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , new String(n.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("SqlFilter Consumer Started.%n");
}
}
生产者:
org.apache.rocketmq.example.filter.SqlFilterProducer
消费者:
org.apache.rocketmq.example.filter.SqlFilterConsumer
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。我们可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL ,IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
使用注意:
- 只有推模式的消费者可以使用SQL过滤。拉模式是用不了的;
- 另外消息过滤是在Broker端进行的,提升网络传输性能,但是broker服务会比较繁忙。(consumer将过滤条件推送给broker端)
5.7.事务消息
这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。
5.7.1.什么是事务消息
事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个操作的原子性,也就是这两个操作一起成功或者一起失败。
5.7.2.事务消息的实现机制
事务消息机制的关键是在发送消息时会将消息转为一个half半消息,并存入RocketMQ内部的一个Topic(RMQ_SYS_TRANS_HALF_TOPIC),这个Topic对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
5.7.3.事务消息的编程模型
事务消息只保证消息发送者的本地事务与发消息这两个操作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。
事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。
package transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
/**
* 事务消息生产者
* Created by BaiLi
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("TransProducer");
producer.setNamesrvAddr("192.168.43.137:9876");
//使用executorService异步提交事务状态,从而提高系统的性能和可靠性
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
//本地事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC","TagD","TagE"};
for (int i = 0; i < 10; i++) {
Message message = new Message("TransactionTopic",
tags[ i % tags.length],
("Transaction_"+ tags[ i % tags.length]).getBytes(StandardCharsets.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", transactionSendResult);
Thread.sleep(10); //延迟10毫秒
}
Thread.sleep(100000);//等待broker端回调
producer.shutdown();
}
}
package transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* 本地事务监听器
* Created by BaiLi
*/
public class TransactionListenerImpl implements TransactionListener {
@Override
/**
* 在提交完事务消息后执行。
* 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
* 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
* 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
*/
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String tags = message.getTags();
//TagA的消息会立即被消费者消费到
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagB的消息会被丢弃
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//其他消息会等待Broker进行事务状态回查。
}else{
return LocalTransactionState.UNKNOW;
}
}
@Override
/**
* 在对UNKNOWN状态的消息进行状态回查时执行。
* 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
* 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
* 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
*/
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String tags = messageExt.getTags();
//TagC的消息过一段时间会被消费者消费到
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagD的消息也会在状态回查时被丢弃掉
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//剩下TagE的消息会在多次状态回查后最终丢弃
}else{
return LocalTransactionState.UNKNOW;
}
}
}
生产者:
org.apache.rocketmq.example.transaction.TransactionProducer
5.7.4.事务消息的使用限制
- 事务消息不支持延迟消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N = transactionCheckMax)则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。可以通过重写AbstractTransactionCheckListener类来修改这个行为。
- 事务性消息可能不止一次被检查或消费。