在生产者将消息发送到消息队列后,他们不期望立即消费,而是等待消费者在指定时间后消费。这种消息通常被称为延迟消息。事实上,延迟消息的应用场景非常广泛,如以下场景:
- 在线直播教学时,在课程开始前15分钟通知所有学生准备上课。
- 订单提交成功后一小时内未付款,订单需及时关闭并释放相应商品的库存。
- 当用户未登录超过15天时,将召回推送发送给用户。
- 提交工单后24小时以上未处理,并向相关负责人发出催促处理提醒。
针对延迟消息,本文分享了五个实现方案。让我们逐一讨论各种方案的总体实现和优缺点。
一、RedisRedis中有一个有序的集合(Sorted Set)在有序集合中的数据结构中,所有元素都是基于它的 Score 排序。我们可以把消息的预期时间戳作为Score,定期任务可以不断读取Score大于当前时间的元素。基本流程如下:
- 调用API,传输执行时间、消息体等数据。
- 在Redis的String结构中,生成唯一的key,序列化消息体数据。
- 将key和执行时间戳存储在Redis的有序集合结构中,而不是存储特定的信息数据,而是存储唯一的key。
- 定时任务不断读取时间戳最小的消息。
- 如果时间戳小于当前时间,将key放入Redis作为队列的List结构中。
- 另一个定时任务不断地从队列中读取需要消费的信息。
- 根据key获取消息体数据,消费消息。
- 如果消费信息成功,删除key对应的消息数据。
- 如果消费信息失败,重新存储key和时间戳(加60秒)。
具体方案如下:
为了避免在有序集合中存储过多的延迟信息、存储操作和查询操作速度较慢的问题,可以建立多个有序集合,并通过哈希算法将信息路由集中到不同的有序集合中。
优点简单实用,落地快。
缺点- 单个有序集合不能支持太多的数据。
- 不断阅读定期任务可能会导致不必要的请求。
因此,Redis方案不是一个非常成熟的方案,而是一个支持小新闻快速实施的方案。
二、RabbitMQRabitMQ本身并不支持延迟消息功能,一般的做法是通过最大的生存时间(Time-To-Live)和死信交换机(Dead Letter Exchanges)模拟延迟消息功能的两个特性。如果消息超过最大生存时间,不被消费,就会变成死信,重新送到死信交换机,然后死信交换机按照绑定规则转发到相应的死信队列。监控队列可以重新消耗消息。
然而,RabitMQ的3.5.8版本结束后,我们可以使用官方推荐的rabbitmq delayed message exchange插件很容易实现延迟消息的功能。
安装插件
先在官方插件列表页面下载rabbitmq__delayed_message_exchang插件,然后复制到RabitMQ每个节点的plugins目录中。使用命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
一旦插件启用,我们就可以开始使用它了。
使用示例
安装插件后,将生成Exchange类型,支持延迟交付机制:x-delayed-message。收到这类消息后,消息不会立即发送到目标队列,而是存储在mnesia表中,然后在检测到可交付时间后发送到目标队列。
使用延迟消息时,需要先声明一个x-delayed-message
交换器类型:
Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);
在header中添加延迟消息x-delay
,毫秒表示延迟:
byte[] messageBodyBytes = "This is a delayed message".getBytes();AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();headers = new HashMap<String, Object>();headers.put("x-delay", 5000);props.headers(headers);channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);
优点大品牌中间件,可靠稳定。
缺点由于master单节点,性能瓶颈和吞吐量有限。
三、ActiveMQActiveMQ开始支持5.4及以上版本的持久延迟信息功能,甚至支持Cron表达式。默认情况下,如果需要修改配置文件activemq,则不会打开该功能.xml,将schedulerSupport属性设置为broker节点的true,例如:
<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true"></broker>
当服务端打开延迟消息功能时,客户端可以利用以下属性发送延迟消息:
- AMQ_SCHEDULED_DELAY:该消息延迟发送时间,单位为毫秒。
- AMQ_SCHEDULED_PERIOD:单位每次重新发送消息的时间间隔为毫秒。
- AMQ_SCHEDULED_REPEAT:重新发送消息的次数。
- AMQ_SCHEDULED_CRON:使用Cron表达式设置发送消息的时机。
- 发送消息延迟60秒:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);producer.send(message);
- 每次间隔10秒,消息延迟60秒,重复发送5次:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);
- 每天凌晨3点,使用Cron表达式发送一条消息:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");producer.send(message);
需要注意的是,Cron表达式由五位组成,分别表示分钟(059)、小时(023)、日(131)、月(112)、星期(0~6,表示从周日到周六)。
- Cron表达式的优先级高于其他参数。如果在设置Cron表达式的同时设置其他参数,则将在每次Cron执行时应用其他参数。例如,该消息延迟60秒,重复5次,间隔10秒,每小时发送一系列信息:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);
优点大品牌中间件,可靠稳定,甚至支持Cron表达式。
缺点由于master单节点,性能瓶颈和吞吐量有限。
四、RocketMQ在RocketMQ中,支持延迟信息,但不支持任何时间精度的延迟信息,只支持特定级别的延迟信息。如果你想支持任何时间精度,你不能避免在Broker层面进行新闻排序,然后涉及持久性考虑,那么新闻排名将不可避免地产生巨大的性能成本。
消息延迟水平分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。发送消息时,可以设置消息延迟级别,设置消息延迟级别时有以下三种情况:
- 如果设置消息延迟等级为0,则该消息为非延迟消息。
- 设置消息延迟级别大于或等于1,小于或等于18。如果设置消息延迟级别等于1,则延迟1s;设置新闻延迟等级为2,则延迟5s,以此类推。
- 如果新闻延迟级别大于18,则新闻延迟级别为18,如果新闻延迟级别等于20,则延迟2h。
首先,为消费延迟写一个消费者信息:
public class Consumer { public static void main(String[] args) throws MQClientException { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 实例消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup"); // 设置Nameserver的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或多个Topic,标签过滤需要消费的消息 consumer.subscribe("OneMoreTopic", "*"); // 注册回调实现类处理从broker拉回来的消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s %s Receive New Messages:%n" , sdf.format(new Date()) , Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.printf("\tMsg Id: %s%n", msg.getMsgId()); System.out.printf("\tBody: %s%n", new String(msg.getBody())); } // 这个消息已经成功消费了 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者实例 consumer.start(); System.out.println("Consumer Started."); }}
另一位制作延迟信息的制作人用于发送延迟信息:
public class DelayProducer { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // Producer DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup"); // 设置Nameserver的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); Message msg = new Message("OneMoreTopic" , "DelayMessage", "This is a delay message.".getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" //设置消息延迟级别为3,也就是说,延迟10s。 msg.setDelayTimeLevel(3); // 将消息发送到Broker SendResult sendResult = producer.send(msg); // 通过sendresult返回消息是否成功送达 System.out.printf("%s Send Status: %s, Msg Id: %s %n" , sdf.format(new Date()) , sendResult.getSendStatus() , sendResult.getMsgId()); // 如果不发消息,关闭Producer实例。 producer.shutdown(); }}
生产者运营后,会发送延迟消息:
10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006DAB018B4AC216EDB60000000000000
10秒后,消费者收到了这条延迟消息:
10:37:25.026 ConsumeMessageThread_1 Receive New Messages:Msg Id: B6000Body,C006DAB018B4AC216E: This is a delay message.
优点分布式、高吞吐量、高性能、高可靠性。
缺点不能自定义延迟时间,只支持18个特定级别的延迟。
五、定制RocketMQ上面提到的不支持定制延迟时间的RocketMQ是开源版,但是阿里云商业版的RocketMQ是支持的,可能是因为业务需求弱或者考虑业务因素,原因不得而知。如果可能的话,可以直接去阿里云;如果可能的话,可以修改开源版RocketMQ的源码,满足自己的需求。知己知彼,百战不殆,先看看RocketMQ开源版,如何支持18个时间级别:
原理分析
RocketMQ源码的版本号为4.7.1.不同版本的源码略有差异。
CommitLog
在Commitlog中,对延迟消息进行了一些处理:
// 延迟级别大于0,即延迟消息if (msg.getDelayTimeLevel() > 0) { // 如果当前延迟水平大于最大延迟水平,则判断当前延迟水平 // 将当前延迟水平设置为最大延迟水平。 if (msg.getDelayTimeLevel() > this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()); } // 获取延迟消息的主题, // RMQ_其中RMQ__SYS_SCHEDULESCHEDULE__TOPIC值TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 队列ID根据延迟级别获取延迟消息, // 事实上,队列ID的延迟级别减少了1 queueId = ScheduleMessageService.delaylevel2QueueId(msg.getDelayTimeLevel()); // 备份真正的主题和队列ID MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageproperties2String(msg.getProperties())); // 设置延迟消息的主题和队列ID msg.setTopic(topic); msg.setQueueId(queueId);}
可以看出,每一个延迟消息的主题都被暂时改为SCHEDULE_TOPIC_XXXX,并根据延迟级别延迟消息更改新的队列ID。接下来,ScheduleMessageservice将处理延迟消息。
ScheduleMessageServiceSchedulemessageservice由defaultmessagestore初始化,包括构建对象和调用load
方法。最后,执行ScheduleMessageService。start
方法:
public void start() { // 使用Atomicbolean确保start方法只能有效执行一次 if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 所有延迟级别遍历 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // key是延迟级别 Integer level = entry.getKey(); // value对应于延迟级别的毫秒数 Long timeDelay = entry.getValue(); // 根据延迟级别获得相应队列的偏移 Long offset = this.offsetTable.get(level); // 若偏移量为null,则设置为0 if (null == offset) { offset = 0L; } if (timeDelay != null) { // 为每个延迟级别创建定时任务, // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒 this.timer.schedule( new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 每隔flushDelayoffsetinterval延迟10秒后执行一次任务。 // 其中,flushdelayoffsetinterval默认配置也是10秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 每个队列消费的持续偏移 if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore .getMessageStoreConfig().getFlushDelayOffsetInterval()); }}
遍历所有延迟级别,根据延迟级别获得相应队列的偏移。如果没有偏移,则设置为0。然后为每个延迟级别创建一个定时任务。第一个启动任务延迟1秒,第二个和以后的启动任务延迟是延迟级别的相应延迟时间。
然后,为每个队列的消费偏移创建了一个定时任务。持久频率由flushdelayofsetinterval属性配置,默认为10秒。
定时任务ScheduleMessageServicestart
方法实施后,每个延迟级别都会创建自己的定时任务。这里定时任务的具体实现在deliverdelayedmesagetimertask类中。它的核心代码是executeontimeup方法。我们来看看主要部分:
// 根据主题和队列ID获取消息队列ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue( TopicValidator.RMQ_SYS_SCHEDULE_TOPIC , delaylevel2QueueId(delayLevel));
如果没有获得相应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:
// SelectMapedbuferesulttersultter根据消费偏移量从消息队列中获取所有有效消息 bufferCQ = cq.getIndexBuffer(this.offset);
如果你没有得到有效的信息,那就在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:
// 所有新闻for遍历 (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 获取消息的物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 获取消息的物理长度 int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略部分代码... long now = System.currentTimeMillis(); // 计算消息应该消耗的时间 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 计算下一条消息的偏移量 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)long countdown = deliverTimestamp - now; // 省略部分代码...}
如果当前消息不到消费时间,那就是countdown
毫秒后执行任务。在消费时间内,继续执行以下操作:
// Messageexttt根据消息的物理偏移量和大小获取消息 msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy);
若获得消息,则继续执行以下操作:
// 新消息的重建包括:/// 1.删除消息的延迟级别// 2.恢复真实的消息主题和队列Idmessageeextbrokeriner msgInner = this.messageTimeup(msgExt);
// 将消息重新发送到PutMessageresultteresulter putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner);
消除了消息的延迟级别,恢复了真实的消息主题和队列ID,并将消息发送到真实的消息队列之后,消费者可以立即消费。
由于篇幅有限,源码的细节没有展开太多,感兴趣的朋友可以去GitHub下载源码仔细阅读。
定制化方案通过以上对源码的分析,可以总结出延迟消息的实现步骤:
- 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列ID为延迟级减1。
- SCHEDULE_TOPIC_XXXX队列中。
- 根据上次提取的偏移量,定时任务不断从队列中提取所有消息。
- 根据消息的物理偏移量和大小再次获取消息。
- 根据新闻属性重新创建新闻,删除延迟级别,恢复原主题和队列ID。
- 将消息重新发送到原主题队列,供消费者消费。
总结如下图所示:
在commitlog中,我们可以根据自定义的延迟时间选择最大的延迟级别,比如15分钟消费的消息,那么最大的延迟级别是10分钟。在ScheduleMessageservice中,判断消息是否真的是消费时间。如果是消费时间,恢复原主题和队列ID;如果没有时间消费,则选择最大延迟级别重新修改主题和队列ID。如下图:
Commitlog中存储着新闻主体和元数据,只存储在Commitlog中的起始物理偏移、新闻大小和新闻标签的哈希值。虽然新闻需要重新放入队列,但空间浪费仍然相对有限。
优点支持自定义延迟时间的分布式、高吞吐量、高性能、高可靠性。
缺点定制RocketMQ不易维护,不能升级新版本。
总结从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案:
- 使用Redis的Sorted Set结构。
- rabbitmq使用RabbitMQ delayed message exchange插件。
- 使用ActiveMQ的5.4及以上版本的延迟消息功能。
- 使用RocketMQ只支持特定级别的延迟消息。
- 定制RocketMQ,通过重新计算延迟级别来实现自定义延迟。
以上每个方案都有自己的优缺点,所以没有一个普遍的延迟消息方案,最合适的方案需要根据数据规模和业务需求的实际情况来确定。
- 在线直播教学时,在课程开始前15分钟通知所有学生准备上课。
- 订单提交成功后一小时内未付款,订单需及时关闭并释放相应商品的库存。
- 当用户未登录超过15天时,将召回推送发送给用户。
- 提交工单后24小时以上未处理,并向相关负责人发出催促处理提醒。
针对延迟消息,本文分享了五个实现方案。让我们逐一讨论各种方案的总体实现和优缺点。
一、RedisRedis中有一个有序的集合(Sorted Set)在有序集合中的数据结构中,所有元素都是基于它的 Score 排序。我们可以把消息的预期时间戳作为Score,定期任务可以不断读取Score大于当前时间的元素。基本流程如下:
- 调用API,传输执行时间、消息体等数据。
- 在Redis的String结构中,生成唯一的key,序列化消息体数据。
- 将key和执行时间戳存储在Redis的有序集合结构中,而不是存储特定的信息数据,而是存储唯一的key。
- 定时任务不断读取时间戳最小的消息。
- 如果时间戳小于当前时间,将key放入Redis作为队列的List结构中。
- 另一个定时任务不断地从队列中读取需要消费的信息。
- 根据key获取消息体数据,消费消息。
- 如果消费信息成功,删除key对应的消息数据。
- 如果消费信息失败,重新存储key和时间戳(加60秒)。
具体方案如下:
为了避免在有序集合中存储过多的延迟信息、存储操作和查询操作速度较慢的问题,可以建立多个有序集合,并通过哈希算法将信息路由集中到不同的有序集合中。
优点简单实用,落地快。
缺点- 单个有序集合不能支持太多的数据。
- 不断阅读定期任务可能会导致不必要的请求。
因此,Redis方案不是一个非常成熟的方案,而是一个支持小新闻快速实施的方案。
二、RabbitMQRabitMQ本身并不支持延迟消息功能,一般的做法是通过最大的生存时间(Time-To-Live)和死信交换机(Dead Letter Exchanges)模拟延迟消息功能的两个特性。如果消息超过最大生存时间,不被消费,就会变成死信,重新送到死信交换机,然后死信交换机按照绑定规则转发到相应的死信队列。监控队列可以重新消耗消息。
然而,RabitMQ的3.5.8版本结束后,我们可以使用官方推荐的rabbitmq delayed message exchange插件很容易实现延迟消息的功能。
安装插件
先在官方插件列表页面下载rabbitmq__delayed_message_exchang插件,然后复制到RabitMQ每个节点的plugins目录中。使用命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
一旦插件启用,我们就可以开始使用它了。
使用示例
安装插件后,将生成Exchange类型,支持延迟交付机制:x-delayed-message。收到这类消息后,消息不会立即发送到目标队列,而是存储在mnesia表中,然后在检测到可交付时间后发送到目标队列。
使用延迟消息时,需要先声明一个x-delayed-message
交换器类型:
Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);
在header中添加延迟消息x-delay
,毫秒表示延迟:
byte[] messageBodyBytes = "This is a delayed message".getBytes();AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();headers = new HashMap<String, Object>();headers.put("x-delay", 5000);props.headers(headers);channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);
优点大品牌中间件,可靠稳定。
缺点由于master单节点,性能瓶颈和吞吐量有限。
三、ActiveMQActiveMQ开始支持5.4及以上版本的持久延迟信息功能,甚至支持Cron表达式。默认情况下,如果需要修改配置文件activemq,则不会打开该功能.xml,将schedulerSupport属性设置为broker节点的true,例如:
<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true"></broker>
当服务端打开延迟消息功能时,客户端可以利用以下属性发送延迟消息:
- AMQ_SCHEDULED_DELAY:该消息延迟发送时间,单位为毫秒。
- AMQ_SCHEDULED_PERIOD:单位每次重新发送消息的时间间隔为毫秒。
- AMQ_SCHEDULED_REPEAT:重新发送消息的次数。
- AMQ_SCHEDULED_CRON:使用Cron表达式设置发送消息的时机。
- 发送消息延迟60秒:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);producer.send(message);
- 每次间隔10秒,消息延迟60秒,重复发送5次:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);
- 每天凌晨3点,使用Cron表达式发送一条消息:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("This is a delayed message");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");producer.send(message);
需要注意的是,Cron表达式由五位组成,分别表示分钟(059)、小时(023)、日(131)、月(112)、星期(0~6,表示星期日至星期六)。
- Cron表达式的优先级高于其他参数。如果在设置Cron表达式的同时设置其他参数,则将在每次Cron执行时应用其他参数。例如,该消息延迟60秒,重复5次,间隔10秒,每小时发送一系列信息:
MessageProducer producer = session.createProducer(destination);TextMessage message = session.createTextMessage("test msg");message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);producer.send(message);
优点大品牌中间件,可靠稳定,甚至支持Cron表达式。
缺点由于master单节点,性能瓶颈和吞吐量有限。
四、RocketMQ在RocketMQ中,支持延迟信息,但不支持任何时间精度的延迟信息,只支持特定级别的延迟信息。如果你想支持任何时间精度,你不能避免在Broker层面进行新闻排序,然后涉及持久性考虑,那么新闻排名将不可避免地产生巨大的性能成本。
消息延迟水平分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。发送消息时,可以设置消息延迟级别,设置消息延迟级别时有以下三种情况:
- 如果设置消息延迟等级为0,则该消息为非延迟消息。
- 设置消息延迟级别大于或等于1,小于或等于18。如果设置消息延迟级别等于1,则延迟1s;设置新闻延迟等级为2,则延迟5s,以此类推。
- 如果新闻延迟级别大于18,则新闻延迟级别为18,如果新闻延迟级别等于20,则延迟2h。
首先,为消费延迟写一个消费者信息:
public class Consumer { public static void main(String[] args) throws MQClientException { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 实例消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup"); // 设置Nameserver的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或多个Topic,标签过滤需要消费的消息 consumer.subscribe("OneMoreTopic", "*"); // 注册回调实现类处理从broker拉回来的消息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s %s Receive New Messages:%n" , sdf.format(new Date()) , Thread.currentThread().getName()); for (MessageExt msg : msgs) { System.out.printf("\tMsg Id: %s%n", msg.getMsgId()); System.out.printf("\tBody: %s%n", new String(msg.getBody())); } // 这个消息已经成功消费了 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者实例 consumer.start(); System.out.println("Consumer Started."); }}
另一位制作延迟信息的制作人用于发送延迟信息:
public class DelayProducer { public static void main(String[] args) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // Producer DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup"); // 设置Nameserver的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); Message msg = new Message("OneMoreTopic" , "DelayMessage", "This is a delay message.".getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" //设置消息延迟级别为3,也就是说,延迟10s。 msg.setDelayTimeLevel(3); // 将消息发送到Broker SendResult sendResult = producer.send(msg); // 通过sendresult返回消息是否成功送达 System.out.printf("%s Send Status: %s, Msg Id: %s %n" , sdf.format(new Date()) , sendResult.getSendStatus() , sendResult.getMsgId()); // 如果不发消息,关闭Producer实例。 producer.shutdown(); }}
生产者运行后,会发送延迟消息:/p>
10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006DAB018B4AC216EDB60000000000000
10秒后,消费者收到了这条延迟消息:
10:37:25.026 ConsumeMessageThread_1 Receive New Messages:Msg Id: B6000Body,C006DAB018B4AC216E: This is a delay message.
优点分布式、高吞吐量、高性能、高可靠性。
缺点不能自定义延迟时间,只支持18个特定级别的延迟。
五、定制RocketMQ上面提到的不支持定制延迟时间的RocketMQ是开源版,但是阿里云商业版的RocketMQ是支持的,可能是因为业务需求弱或者考虑业务因素,原因不得而知。如果可能的话,可以直接去阿里云;如果可能的话,可以修改开源版RocketMQ的源码,满足自己的需求。知己知彼,百战不殆,先看看RocketMQ开源版,如何支持18个时间级别:
原理分析
RocketMQ源码的版本号为4.7.1.不同版本的源码略有差异。
CommitLog
在Commitlog中,对延迟消息进行了一些处理:
// 延迟级别大于0,即延迟消息if (msg.getDelayTimeLevel() > 0) { // 如果当前延迟水平大于最大延迟水平,则判断当前延迟水平 // 将当前延迟水平设置为最大延迟水平。 if (msg.getDelayTimeLevel() > this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore .getScheduleMessageService().getMaxDelayLevel()); } // 获取延迟消息的主题, // RMQ_其中RMQ__SYS_SCHEDULESCHEDULE__TOPIC值TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 队列ID根据延迟级别获取延迟消息, // 事实上,队列ID的延迟级别减少了1 queueId = ScheduleMessageService.delaylevel2QueueId(msg.getDelayTimeLevel()); // 备份真正的主题和队列ID MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageproperties2String(msg.getProperties())); // 设置延迟消息的主题和队列ID msg.setTopic(topic); msg.setQueueId(queueId);}
可以看出,每一个延迟消息的主题都被暂时改为SCHEDULE_TOPIC_XXXX,并根据延迟级别延迟消息更改新的队列ID。接下来,ScheduleMessageservice将处理延迟消息。
ScheduleMessageServiceSchedulemessageservice由defaultmessagestore初始化,包括构建对象和调用load
方法。最后,执行ScheduleMessageService。start
方法:
public void start() { // 使用Atomicbolean确保start方法只能有效执行一次 if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); // 所有延迟级别遍历 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // key是延迟级别 Integer level = entry.getKey(); // value对应于延迟级别的毫秒数 Long timeDelay = entry.getValue(); // 根据延迟级别获得相应队列的偏移 Long offset = this.offsetTable.get(level); // 若偏移量为null,则设置为0 if (null == offset) { offset = 0L; } if (timeDelay != null) { // 为每个延迟级别创建定时任务, // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒 this.timer.schedule( new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 每隔flushDelayoffsetinterval延迟10秒后执行一次任务。 // 其中,flushdelayoffsetinterval默认配置也是10秒 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 每个队列消费的持续偏移 if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore .getMessageStoreConfig().getFlushDelayOffsetInterval()); }}
遍历所有延迟级别,根据延迟级别获得相应队列的偏移。如果没有偏移,则设置为0。然后为每个延迟级别创建一个定时任务。第一个启动任务延迟1秒,第二个和以后的启动任务延迟是延迟级别的相应延迟时间。
然后,为每个队列的消费偏移创建了一个定时任务。持久频率由flushdelayofsetinterval属性配置,默认为10秒。
定时任务ScheduleMessageServicestart
方法实施后,每个延迟级别都会创建自己的定时任务。这里定时任务的具体实现在deliverdelayedmesagetimertask类中。它的核心代码是executeontimeup方法。我们来看看主要部分:
// 根据主题和队列ID获取消息队列ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue( TopicValidator.RMQ_SYS_SCHEDULE_TOPIC , delaylevel2QueueId(delayLevel));
如果没有获得相应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:
// SelectMapedbuferesulttersultter根据消费偏移量从消息队列中获取所有有效消息 bufferCQ = cq.getIndexBuffer(this.offset);
如果你没有得到有效的信息,那就在DELAY_FOR_A_WHILE(默认为100)毫秒后执行任务。如果获得,继续执行以下操作:
// 所有新闻for遍历 (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 获取消息的物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); // 获取消息的物理长度 int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略部分代码... long now = System.currentTimeMillis(); // 计算消息应该消耗的时间 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);// 计算下一条消息的偏移量 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)long countdown = deliverTimestamp - now; // 省略部分代码...}
如果当前消息不到消费时间,那就是countdown
毫秒后执行任务。在消费时间内,继续执行以下操作:
// Messageexttt根据消息的物理偏移量和大小获取消息 msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy);
若获得消息,则继续执行以下操作:
// 新消息的重建包括:/// 1.删除消息的延迟级别// 2.恢复真实的消息主题和队列Idmessageeextbrokeriner msgInner = this.messageTimeup(msgExt);
// 将消息重新发送到PutMessageresultteresulter putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner);
消费者可以立即消费,消除消息的延迟水平,恢复真实的消息主题和队列ID,并将消息发送到真实的消息队列。
由于篇幅有限,源码的细节没有展开太多,感兴趣的朋友可以去GitHub下载源码仔细阅读。
定制化方案通过以上对源码的分析,可以总结出延迟消息的实现步骤:
- 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列ID为延迟级减1。
- SCHEDULE_TOPIC_XXXX队列中。
- 根据上次提取的偏移量,定时任务不断从队列中提取所有消息。
- 根据消息的物理偏移量和大小再次获取消息。
- 根据新闻属性重新创建新闻,删除延迟级别,恢复原主题和队列ID。
- 将消息重新发送到原主题队列,供消费者消费。
总结如下图所示:
在commitlog中,我们可以根据自定义的延迟时间选择最大的延迟级别,比如15分钟消费的消息,那么最大的延迟级别是10分钟。在ScheduleMessageservice中,判断消息是否真的是消费时间。如果是消费时间,恢复原主题和队列ID;如果没有时间消费,则选择最大延迟级别重新修改主题和队列ID。如下图:
Commitlog中存储着新闻主体和元数据,只存储在Commitlog中的起始物理偏移、新闻大小和新闻标签的哈希值。虽然新闻需要重新放入队列,但空间浪费仍然相对有限。
优点支持自定义延迟时间的分布式、高吞吐量、高性能、高可靠性。
缺点定制RocketMQ不易维护,不能升级新版本。
总结从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案:
- 使用Redis的Sorted Set结构。
- rabbitmq使用RabbitMQ delayed message exchange插件。
- 使用ActiveMQ的5.4及以上版本的延迟消息功能。
- 使用RocketMQ只支持特定级别的延迟消息。
- 定制RocketMQ,通过重新计算延迟级别来实现自定义延迟。
以上每个方案都有自己的优缺点,所以没有一个普遍的延迟消息方案,最合适的方案需要根据数据规模和业务需求的实际情况来确定。
