1.概述
为了实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange
但是RabitMQ版本必须是3.5.8以上支持插件,否则必须使用其死信队列功能。
- 检查插件的使用情况
rabbitmq-plugins list
用于查看RabbitMQ安装的插件。
rabbitmq-plugins list
检查RabbitMQ插件的安装情况
- 下载插件
如果没有安装插件,直接访问官网下载
https://www.rabbitmq.com/community-plugins.html
- 安装插件
下载后,将其复制到RabitMQ安装目录的plugins目录;并解压,如:
E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins
打开cmd命令行窗口。如果系统配备了RabitMQ环境变量,则直接执行以下命令进行安装;否则,您需要进入RabitMQ安装目录的sbin目录。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.实现RabitMQ消息队列延迟功能
- pom.在xml配置信息文件中添加相关依赖文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.olive</groupId><artifactId>rabbitmq-spring-demo</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</version><relativePath /></parent><dependencies><!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
- application.RabitMQ信息配置在yml配置文件中
server: port: 8080spring: #给项目起个名字 application: name: rabbitmq-spring-demo #rabitMq配置 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin123 #虚拟host。使用server默认hosttt可以不设置;不同虚拟路径下的队列是隔离的 virtual-host: /
- RabitMQ配置类别
package com.olive.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * RabitMQ配置类别 **/@Configurationpublic class RabbitMqConfig {public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";public static final String DELAY_QUEUE_NAME = "delay_queue_name";public static final String DELAY_ROUTING_KEY = "delay_routing_key";@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Queue queue() {Queue queue = new Queue(DELAY_QUEUE_NAME, true);return queue;}@Beanpublic Binding binding(Queue queue, CustomExchange delayExchange) {return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}}
- 发送消息
实现消息发送,设置消息延迟5s。
package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.olive.config.RabbitMqConfig;/** * 消息发送者 **/@Servicepublic class CustomMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送时间:" + sdf.format(new Date()));rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_ROUTING_KEY, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息延迟5秒message.getMessageProperties().setHeader("x-delay", 5000);return message;}});}}
- 接收消息
package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.olive.config.RabbitMqConfig;/** * 消息接收者 **/@Componentpublic class CustomMessageReceiver {@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)public void receive(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(sdf.format(new Date()) + msg);System.out.println("Receiver:执行取消订单”);}}
- 测试验证
package com.olive.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import com.olive.service.CustomMessageSender;@RestControllerpublic class DelayMessageController {@Autowiredprivate CustomMessageSender customMessageSender;@GetMapping("/sendMessage")public String sendMessage() {// customessagesender发送消息.sendMsg(”你已经支付了超时费,取消订单通知!");return "success";}}
发送消息,访问
http://127.0.0.1:8080/sendMessage
查看控制台打印的信息