1.工作模式
工作模式也被称为任务模型(Task Queues)。当消息处理耗时时时,可能会比消息消费更快地生产消息。从长远来看,消息会越来越多,无法及时处理。这个时候可以用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。一旦队列中的消息消失,任务就不会重复执行。
这种模式只有一个生产者Producer,一个用于存储信息的队列 Queue、多个消费者Consumer用于接收消息。
工作队列模式有三个特点:
- 一个生产者,一个队列,多个消费者同时竞争新闻
- 任务量过高可以提高工作效率
- 消费者得到的消息是无序的
生产者向队列发送10条消息
package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(工作模式) */public class WorkerProducer { /**队列名*// private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、创建连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道 Channel channel = connection.createChannel(); // 3、声明队列 queueDeclare(队列名称,是否持久,是否独占本连接,是否自动删除,附加属性参数) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、发送10条消息 for (int i = 1; i <= 10; i++) { String msg = "Hello World RabbitMQ!!!!" + i; System.out.println(“生产者发消息:” + msg); // basicPublish(交换机名称-”表示不需要交换机,队列名称或routingKey, 新闻属性信息, 字节数组的新闻内容); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } ///释放资源 channel.close(); connection.close(); }}
1.2. 创建消费者
创建两个消费者Workerconsumer1和Workerconsumer2
- Workerconsumer.java
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者1(工作模式) */public class Workerconsumer { /**队列名*// private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、创建队列Queue,如果没有一个叫work_queue的队列,就会创建这个队列,如果有,就不会创建. // 这里是可有可无的,但必须有这个队列发送消息,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、监控队列,接收信息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者发送时指定), 读到的信息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println“消费者获取消息:” + new String(body)); // 模拟消息处理延迟,增加线程睡眠时间 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //注意,消费者不建议关闭这里的资源,这样程序就可以一直阅读信息 }}
- Workerconsumer2.java
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者2(工作模式) */public class Workerconsumer2 { /**队列名*// private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、创建队列Queue,如果没有一个叫work_queue的队列,就会创建这个队列,如果有,就不会创建. // 这里是可有可无的,但必须有这个队列发送消息,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、监控队列,接收信息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者发送时指定), 读到的信息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println“消费者获取消息:” + new String(body)); // 模拟消息处理延迟,增加线程睡眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //注意,消费者不建议关闭这里的资源,这样程序就可以一直阅读信息 }}
1.3. 验证消费者2和消费者1的代码逻辑完全相同
首先,分别启动两个消费者**(注意这里必须先启动消费者)**
从RabbitMQ管理的后台查看,已经创建了work_queue
队列。
启动生产者,分别查看消费者1和消费者2控制台的打印信息
消费者1Workerconsumer
消费者2Workerconsumer2
从两个消费者控制台的打印结果来看,两个消费者消费的消息就像是轮询消费。
- 轮询分发(round-robin)
以上实现的是轮询分发的方式。
现象:消费者1处理消息后,消费者2可以处理,两者轮流处理消息,直到消息处理完成。这种方法被称为轮询分发(round-robin),因此,无论两个消费者谁忙,数据总是你消费一个,我消费一个,无论消费者处理数据的性能如何,此时autoack = true。
/*** @param queue 队列名称* @param autoAck true自动确认是否自动发送确认,意味着收到消息后,在队列中自动删除消息;false手动发送ack确认消息* @param callback 回调对象*////String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
注:autoack属性设置为true,表示信息自动确认。消费者消费时消息的确认模式可分为:自动确认和手动确认。
自动确认:消费者读取队列中的消息后,将自动从队列中删除。消息将被删除,无论消费者是否成功消费。
手动确认:当消费者阅读消息时,消费者需要手动发送ACK来确认消息已经成功消费(即需要编写代码发送ACK确认),如果设置为手动确认而不发送ACK确认,那么消息将始终存在于队列中(前提是持久操作),随后可能导致消息重复消费,如果队列中积累的消息过多,也可能导致内存溢出,处理消息后,手动确认消费者应及时向队列发送ACK确认。
轮询分发的使用会有明显的缺点。比如消费者1处理数据的效率很慢,消费者2处理数据的效率很高。正常情况下,消费者2处理的数据应该多一点,而轮询分发无论你的表现如何,都是每次处理一个消息。这种情况可以通过公平分发来解决。
- 公平分发(fair dipatch)
要实现公平分配,需要进行以下修改:
- 消费者:保证消息只分发一次
- 消费者:关闭自动确认,手动向队列发送ACK
修改后再次运行。由于消费者1设置处理消息后睡眠2秒,消费者睡眠2秒,预期输出结果为:消费者2处理消息的速度约为消费者1的两倍,结果如下。
消费者1
消费者2
2.发布订阅模式发布订阅模式(Publish/Subscribe):该模式涉及交换机,也可称为广播模式,通过交换机将新闻广播到所有与之绑定的队列中。
一个消费者首先将消息发送到交换机上(这里的交换机类型是fanout),然后将交换机绑定到多个队列,这样每个发送到fanout类型交换机的消息都会分发到所有队列,最后被监控队列的消费者接收和消费。如下图所示:
- 创建生产者
package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(发布订阅模式) */public class PubSubProducer { // 交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 1、创建连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道 Channel channel = connection.createChannel(); // 3、连续发送10条信息 for (int i = 1; i <= 10; i++) { String msg = "Hello World RabbitMQ!!!!~~~" + i; System.out.println(”生产者发送的消息:" + msg); //basicPublish(交换机名称[默认Defaultttdefaultt Exchage],其他属性,路由key[简单模式可以传递队列名称],发送的消息内容) channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); } ///关闭资源 channel.close(); connection.close(); }}
- 创建消费者
由于从这里开始涉及交换机,这里介绍四种交换机的类型:
- direct(直连):新闻中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器将信息发送到相应的队列。基于完全匹配和单播的模式。
- fanout(广播):将所有发送到fanout交换器的消息路由发送到所有绑定交换器的队列中,fanout 类型转发信息最快。
- topic(主题):信息路由通过模式匹配,路由键与某个模式匹配,此时队列需要绑定到一个模式。匹配规则:
① RoutingKey 和 BindingKey 为一个 点号 '.' 分开的字符串。 比如: stock.usd.nyse;routing_key中可以放任何key,当然最长不能超过255 bytes。
② 可以使用BindingKey * 和 # 用于模糊匹配:*匹配一个单词,#匹配0个或多个单词;
- headers:不依赖路由键匹配,而是根据发送消息内容中的headers属性进行匹配。此外,headers交换器与direct交换器完全一致,但性能要差得多,目前几乎没有使用。
消费者1
注:在发送消息之前,RabitMQ服务器中必须有一个队列,否则消息可能会丢失。如果还涉及到交换机和队列绑定,则必须首先声明交换机、队列并设置绑定路由值(Routing Key),为了避免程序异常,因为本例中的所有声明都在消费者中,所以我们必须首先启动消费者。如果RabitMQ服务器中已经有声明的队列或交换机,则不会创建。如果没有,则创建相应名称的队列或交换机。
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者1(发布订阅模式) */public class Pubsubconsumer { // 队列名称 private static final String QUEUE_NAME1 = "fanout_queueue1"; // 交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); /* 3、声明交换机 * exchange 参数1:交换机名称 * type 参数2:交换机类型 * durable 参数3:交换机是否持久 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true); // 4、声明队列Queue queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME1, true, false, false, null); // 5、绑定队列和交换机 queueBind(队列名, 交换机名, 路由key[交换机的类型是fanout ,将routingKey设置为“”) channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, ""); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println(“交换机名称:” + exchange + ",消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME1, true, defaultConsumer); //请注意,消费者不建议在这里关闭资源,让程序始终处于读取消息的状态 }}
消费者2
消费者1基本相同,但队列名称不同
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者2(发布订阅模式) */public class ubsubconsumer2 {// privatet队列名称 static final String QUEUE_NAME2 = "fanout_queueue2”;// privatete交换机的名称 static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {// 1、Connection获取连接对象 connection = ConnectionUtils.getConnection();// 2、创建通道(频道)Channel channel = connection.createChannel();// 3、声明交换机,如果没有创建EXCHANGE_NAME的交换机,则没有创建Chanel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);// 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)channel.queueDeclare(QUEUE_NAME2, true, false, false, null);// 5、绑定队列和交换机。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)channel.queueDeclare(QUEUE_NAME2, true, false, false, null);// 5、绑定队列和交换机。channel.queueBind(队列名, 交换机名, 路由key[fanout交换机routingkey设置为“”])channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");// 6、监听队列,Defaultconsumer接收消息 defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// String获取交换机信息 exchange = envelope.getExchange();// String获取消息信息 message = new String(body, "utf-8");System.out.println(“交换机名称:” + exchange + ",消费者获取消息: " + message);}};channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);// 注意,这里不建议消费者关闭资源,让程序一直处于读取消息的状态}}
- 验证
首先,启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台上查看生产者发送的所有信息;实现广播效果。
消费者1
消费者2
执行测试代码后,在RabitMQ管理后台找到Exchanges选项卡,点击fanout_exchange
交换机,可查看以下绑定:
fanout_exchange
是代码中定义的交换机的名称;fanout_queue1
和fanout_queue2
消费者1和消费者2在代码中定义的两个队列的名称
- 总结
发布订阅模式引入了交换机的概念,因此它比以前的类型更灵活、更广泛。该模式需要设置一个类型为fanout的交换机,并绑定交换机和队列。当信息发送到交换机时,交换机会将信息发送到所有绑定的队列,最终被监控队列的消费者接收和消费。发布订阅模式也可以称为广播模式,不需要RoutingKey的判断。
发布订阅模式与工作队列模式的区别:
- 工作队列模式不需要定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的制造商向交换机发送信息,工作队列模式的制造商向队列发送信息(底部使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机绑定,不需要设置工作队列模式。事实上,工作队列模式将队列绑定到默认交换机 。