当前位置: 首页 > 图灵资讯 > 技术篇> SpringBoot RabbitMQ死信队列

SpringBoot RabbitMQ死信队列

来源:图灵教育
时间:2023-05-04 10:44:47

 

1. 死信定义

不能消费的消息叫死信。

如果死信一直留在队列中,就会导致消费,但从不成功消费。有一个专门存储死信的队列,叫做死信队列(DDX, dead-letter-exchange)。

死信队列

DLX,Dead Letter Exchange的缩写,死信邮箱,死信交换机。其实DLX是普通的交换机,和普通的交换机没什么区别。当消息在一个队列中变成死信时(dead message)当死信通过这个交换机发送到死信队列时(RabbitMQ会自动发送相关参数)。

死信的几个来源:

  • 新闻TTL过期了(time to live,存活时间可用于限时支付消息)
  • 队列达到最大长度(队列满了,不能路过队列)
  • 消息被拒绝(basic.reject/basic.nack),而requeue = false

SpringBoot RabbitMQ死信队列_rabbitmq

2. 创建项目
  • pom.xml配置如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.olive</groupId><artifactId>rabbitmq-spring-demo</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</version><relativePath /></parent><dependencies><!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
  • application.yml
server:  port: 8080spring:  #给项目起个名字  application:    name: rabbitmq-spring-demo  #rabitMq配置 服务器  rabbitmq:    host: 127.0.0.1    port: 5672    username: admin    password: admin123    #虚拟host。使用server默认hosttt可以不设置;不同虚拟路径下的队列是隔离的    virtual-host: /
  • 准备MQ的队列和环境场景

正常交换机

  1. 正常队列(最长队列5);正常消费者拒绝消息
  2. tt队列(过期时间60秒);没有消费者

死信交换机

  1. 死信队列
package com.olive.config;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DeadConfig {/************ 正常配置 ******************//** * 正常交换机,开启持久化 */@BeanDirectExchange normalExchange() {return new DirectExchange("normalExchange", true, false);}@Beanpublic Queue normalQueue() {// durable: 是否持久,默认为false,持久队列:将存储在磁盘上,当新闻代理重启时仍然存在,暂存队列:当前连接有效/// exclusive: 默认也是false,只能用于当前创建的连接,当连接关闭时,队列将被删除。本参考优先级高于durable//// autoDelete: 是否自动删除,当没有生产者或消费者使用该队列时,该队列将自动删除。Map<String, Object> args = deadQueueArgs();// argss队列设置最大长度.put("x-max-length", 5);return new Queue("normalQueue", true, false, false, args);}@Beanpublic Queue ttlQueue() {Map<String, Object> args = deadQueueArgs();// 队列设置信息过期时间 60 秒args.put("x-message-ttl", 60 * 1000);return new Queue("ttlQueue", true, false, false, args);}@BeanBinding normalRouteBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRouting");}@BeanBinding ttlRouteBinding() {return BindingBuilder.bind(ttlQueue()).to(normalExchange()).with("ttlRouting");}/**************** 死信配置 *****************//** * 死信交换机 */@BeanDirectExchange deadExchange() {return new DirectExchange("deadExchange", true, false);}/** * 死信队列 */@Beanpublic Queue deadQueue() {return new Queue("deadQueue", true, false, false);}@BeanBinding deadRouteBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");}/** * 转发到 死信队列,配置参数 */private Map<String, Object> deadQueueArgs() {Map<String, Object> map = new HashMap<>();// 将队列绑定到死信交换机mapp上.put("x-dead-letter-exchange", "deadExchange");map.put("x-dead-letter-routing-key", "deadRouting");return map;}}

arguments参数说明:

SpringBoot RabbitMQ死信队列_java_02

  1. Auto expire: 在自动删除之前,队列可以使用多长时间(毫秒)?(x-expires参数)
  2. Message TTL: 在被丢弃之前,发布到队列的消息能持续多久(毫秒)。(x-message-ttl参数)
  3. Overflow behaviour: 设置队列溢出行为。这决定了当队列达到最大长度时会发生什么。drop的有效值-head(删除queue头部的消息)、reject-publish(最近的消息将被丢弃)或rejectt-publish-dlx(拒绝向死信交换器发送消息)。只支持drop-head和拒绝-发布仲裁队列类型。(x-overflow参数
  4. Single active consumer: 如果设置,确保每次只从队列中使用一个用户,活动用户将被取消或死亡-dead-letter-当exchange死亡时,故障转移到另一个注册用户。(x-single-active-consumer参数)
  5. Dead letter exchange: 如果消息被拒绝或过期,可选的死信交换机将重新发布到死信交换机。(x-dead-letter-exchange参数)
  6. Dead letter routing key: 当消息是死信时使用的可选替换路由键。如果没有设置此值,则使用原始路由键。(x-dead-letter-routing-key参数)
  7. Max length: 在开始从头开始丢弃消息之前,一个队列能包含多少(准备好了)消息。(x-max-length参数)
  8. Max length bytes: 在开始从头部丢弃消息之前,队列所能包含的就绪消息的总正文大小。(x-max-length-bytes参数)
  9. Leader locator:将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。(x-queue-leader-locator参数)
3. 队列达到的最大长度

没有消费者消息;调用6个正常队列的消息制作方法;消息数量超过队列长度。

package com.olive.controller;import java.util.HashMap;import java.util.Map;import java.util.UUID;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class TestController {@Autowiredprivate RabbitTemplate rabbitTemplate;/** * 正常消息队列,队列最大长度5 */@GetMapping("/normalQueue")public String normalQueue() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", System.currentTimeMillis() + ", 正常队列消息,最大长度为5);rabbitTemplate.convertAndSend("normalExchange", "normalRouting", map, new CorrelationData());return "success";}}

访问6次,发送6条信息

http://127.0.0.1:8080/normalQueue

在RabbitMQ管理后台查看结果:

SpringBoot RabbitMQ死信队列_java_03

4. 新闻TTL过期了

信息TTL是指信息存活时间,可以通过设置信息TTL或队列TTL来实现。

  • TTL的消息: 设置过期时间属性(expiration)如果消息在过期时间内没有消费,就会过期。
  • TTL队列: 设置过期时间属性(x-message-ttl)队列,所有路由到这个队列的消息,都会设定这个过期时间。

两种配置都可以由用户使用,通常用于定期任务和限时支付场景。

/*** 消息 TTL, time to live*/@GetMapping("/ttlToDead")public String ttlToDead() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", System.currentTimeMillis() + ", ttl队列消息);rabbitTemplate.convertAndSend("normalExchange", "ttlRouting", map, new CorrelationData());return "success";}

访问6次,发送6条信息

http://127.0.0.1:8080/ttlToDead

从RabbitMQ管理后台查看结果,发送消息后

SpringBoot RabbitMQ死信队列_rabbitmq_04

从RabbitMQ管理的后台查看结果,等待消息过期

SpringBoot RabbitMQ死信队列_java_05

建议在不使用队列TTL的情况下,尽量在项目中使用新闻TL

5.拒绝消息

正常队列消费后拒绝消息,不重新入队

package com.olive.config;import java.io.IOException;import java.util.Map;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Component@RabbitListener(queues = "normalQueue")public class NormalConsumer {@RabbitHandlerpublic void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {System.out.println(”收到消息,并拒绝重新入队:" + message.toString());channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(), false);}}

在RabbitMQ管理后台查看结果

SpringBoot RabbitMQ死信队列_java_06

死信队列消息消费消息6.
package com.olive.config;import java.io.IOException;import java.util.Map;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Component@RabbitListener(queues = "deadQueue")public class DeadConsumer {@RabbitHandlerpublic void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {System.out.println(“死信队列收到消息” + message.toString());channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(), false);}}

从RabbitMQ管理的后台查看结果,死信队列消息被完全消费

SpringBoot RabbitMQ死信队列_java_07

 

blog.csdn.net/java_zhaobobo/article/details/124942704