首页 > 图灵资讯 > 技术篇>正文

深入学习RabbitMQ五种模式(二)

2023-05-04 10:44:19

1.工作模式

工作模式也被称为任务模型(Task Queues)。当消息处理耗时时时,可能会比消息消费更快地生产消息。从长远来看,消息会越来越多,无法及时处理。这个时候可以用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。一旦队列中的消息消失,任务就不会重复执行。

这种模式只有一个生产者Producer,一个用于存储信息的队列 Queue、多个消费者Consumer用于接收消息。

深入学习RabbitMQ五种模式(二)_发布订阅

工作队列模式有三个特点:

  • 一个生产者,一个队列,多个消费者同时竞争新闻
  • 任务量过高可以提高工作效率
  • 消费者得到的消息是无序的
1.1. 创建生产者

生产者向队列发送10条消息

package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(工作模式) */public class WorkerProducer {    /**队列名*//    private static final String QUEUE_NAME = "work_queue";    public static void main(String[] args) throws Exception {        // 1、创建连接        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道        Channel channel = connection.createChannel();        // 3、声明队列 queueDeclare(队列名称,是否持久,是否独占本连接,是否自动删除,附加属性参数)        channel.queueDeclare(QUEUE_NAME, true, false, false, null);        // 4、发送10条消息        for (int i = 1; i <= 10; i++) {            String msg = "Hello World RabbitMQ!!!!" + i;            System.out.println(“生产者发消息:” + msg);            // basicPublish(交换机名称-”表示不需要交换机,队列名称或routingKey, 新闻属性信息, 字节数组的新闻内容);            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());        }        ///释放资源        channel.close();        connection.close();    }}
1.2. 创建消费者

创建两个消费者Workerconsumer1和Workerconsumer2

  • Workerconsumer.java
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者1(工作模式) */public class Workerconsumer {    /**队列名*//    private static final String QUEUE_NAME = "work_queue";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、创建队列Queue,如果没有一个叫work_queue的队列,就会创建这个队列,如果有,就不会创建.        // 这里是可有可无的,但必须有这个队列发送消息,否则消息会丢失        channel.queueDeclare(QUEUE_NAME, true, false, false, null);        // 4、监控队列,接收信息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者发送时指定), 读到的信息)            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println“消费者获取消息:” + new String(body));                // 模拟消息处理延迟,增加线程睡眠时间                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        // basicConsume(队列名称, 是否自动确认, 回调对象)        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);        //注意,消费者不建议关闭这里的资源,这样程序就可以一直阅读信息    }}
  • Workerconsumer2.java
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者2(工作模式) */public class Workerconsumer2 {    /**队列名*//    private static final String QUEUE_NAME = "work_queue";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、创建队列Queue,如果没有一个叫work_queue的队列,就会创建这个队列,如果有,就不会创建.        // 这里是可有可无的,但必须有这个队列发送消息,否则消息会丢失        channel.queueDeclare(QUEUE_NAME, true, false, false, null);        // 4、监控队列,接收信息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者发送时指定), 读到的信息)            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                System.out.println“消费者获取消息:” + new String(body));                // 模拟消息处理延迟,增加线程睡眠时间                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };        // basicConsume(队列名称, 是否自动确认, 回调对象)        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);        //注意,消费者不建议关闭这里的资源,这样程序就可以一直阅读信息    }}

消费者2和消费者1的代码逻辑完全相同

1.3. 验证

首先,分别启动两个消费者**(注意这里必须先启动消费者)**

深入学习RabbitMQ五种模式(二)_发布订阅_02

从RabbitMQ管理的后台查看,已经创建了work_queue队列。

启动生产者,分别查看消费者1和消费者2控制台的打印信息

消费者1Workerconsumer

深入学习RabbitMQ五种模式(二)_发送消息_03

消费者2Workerconsumer2

深入学习RabbitMQ五种模式(二)_发送消息_04

从两个消费者控制台的打印结果来看,两个消费者消费的消息就像是轮询消费。

  • 轮询分发(round-robin)

以上实现的是轮询分发的方式。

现象:消费者1处理消息后,消费者2可以处理,两者轮流处理消息,直到消息处理完成。这种方法被称为轮询分发(round-robin),因此,无论两个消费者谁忙,数据总是你消费一个,我消费一个,无论消费者处理数据的性能如何,此时autoack = true。

/*** @param queue 队列名称* @param autoAck true自动确认是否自动发送确认,意味着收到消息后,在队列中自动删除消息;false手动发送ack确认消息* @param callback 回调对象*////String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

注:autoack属性设置为true,表示信息自动确认。消费者消费时消息的确认模式可分为:自动确认和手动确认。

自动确认:消费者读取队列中的消息后,将自动从队列中删除。消息将被删除,无论消费者是否成功消费。

手动确认:当消费者阅读消息时,消费者需要手动发送ACK来确认消息已经成功消费(即需要编写代码发送ACK确认),如果设置为手动确认而不发送ACK确认,那么消息将始终存在于队列中(前提是持久操作),随后可能导致消息重复消费,如果队列中积累的消息过多,也可能导致内存溢出,处理消息后,手动确认消费者应及时向队列发送ACK确认。

轮询分发的使用会有明显的缺点。比如消费者1处理数据的效率很慢,消费者2处理数据的效率很高。正常情况下,消费者2处理的数据应该多一点,而轮询分发无论你的表现如何,都是每次处理一个消息。这种情况可以通过公平分发来解决。

  • 公平分发(fair dipatch)

要实现公平分配,需要进行以下修改:

  1. 消费者:保证消息只分发一次
  2. 消费者:关闭自动确认,手动向队列发送ACK

深入学习RabbitMQ五种模式(二)_发送消息_05

修改后再次运行。由于消费者1设置处理消息后睡眠2秒,消费者睡眠2秒,预期输出结果为:消费者2处理消息的速度约为消费者1的两倍,结果如下。

消费者1

深入学习RabbitMQ五种模式(二)_发送消息_06

消费者2

深入学习RabbitMQ五种模式(二)_发布订阅_07

2.发布订阅模式

发布订阅模式(Publish/Subscribe):该模式涉及交换机,也可称为广播模式,通过交换机将新闻广播到所有与之绑定的队列中。

一个消费者首先将消息发送到交换机上(这里的交换机类型是fanout),然后将交换机绑定到多个队列,这样每个发送到fanout类型交换机的消息都会分发到所有队列,最后被监控队列的消费者接收和消费。如下图所示:

研究RabitMQ五种模式(二)_发送消息_08

  • 创建生产者
package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(发布订阅模式) */public class PubSubProducer {    // 交换机名称    private static final String EXCHANGE_NAME = "fanout_exchange";    public static void main(String[] args) throws Exception {        // 1、创建连接        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道        Channel channel = connection.createChannel();        // 3、连续发送10条信息        for (int i = 1; i <= 10; i++) {            String msg = "Hello World RabbitMQ!!!!~~~" + i;            System.out.println(”生产者发送的消息:" + msg);            //basicPublish(交换机名称[默认Defaultttdefaultt Exchage],其他属性,路由key[简单模式可以传递队列名称],发送的消息内容)            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());        }        ///关闭资源        channel.close();        connection.close();    }}
  • 创建消费者

由于从这里开始涉及交换机,这里介绍四种交换机的类型:

  1. direct(直连):新闻中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器将信息发送到相应的队列。基于完全匹配和单播的模式。
  2. fanout(广播):将所有发送到fanout交换器的消息路由发送到所有绑定交换器的队列中,fanout 类型转发信息最快。
  3. topic(主题):信息路由通过模式匹配,路由键与某个模式匹配,此时队列需要绑定到一个模式。匹配规则:

① RoutingKey 和 BindingKey 为一个 点号 '.' 分开的字符串。 比如: stock.usd.nyse;routing_key中可以放任何key,当然最长不能超过255 bytes。

② 可以使用BindingKey * 和 # 用于模糊匹配:*匹配一个单词,#匹配0个或多个单词;

  1. headers:不依赖路由键匹配,而是根据发送消息内容中的headers属性进行匹配。此外,headers交换器与direct交换器完全一致,但性能要差得多,目前几乎没有使用。

消费者1

注:在发送消息之前,RabitMQ服务器中必须有一个队列,否则消息可能会丢失。如果还涉及到交换机和队列绑定,则必须首先声明交换机、队列并设置绑定路由值(Routing Key),为了避免程序异常,因为本例中的所有声明都在消费者中,所以我们必须首先启动消费者。如果RabitMQ服务器中已经有声明的队列或交换机,则不会创建。如果没有,则创建相应名称的队列或交换机。

package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者1(发布订阅模式) */public class Pubsubconsumer {    // 队列名称    private static final String QUEUE_NAME1 = "fanout_queueue1";    // 交换机名称    private static final String EXCHANGE_NAME = "fanout_exchange";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        /* 3、声明交换机         * exchange  参数1:交换机名称         * type      参数2:交换机类型         * durable   参数3:交换机是否持久         */        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);        // 4、声明队列Queue queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);        // 5、绑定队列和交换机 queueBind(队列名, 交换机名, 路由key[交换机的类型是fanout ,将routingKey设置为“”)        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");        // 6、监听队列,接收消息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                //获取交换机信息                String exchange = envelope.getExchange();                //获取消息信息                String message = new String(body, "utf-8");                System.out.println(“交换机名称:” + exchange + ",消费者获取消息: " + message);            }        };        channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);        //请注意,消费者不建议在这里关闭资源,让程序始终处于读取消息的状态    }}

消费者2

消费者1基本相同,但队列名称不同

package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者2(发布订阅模式) */public class ubsubconsumer2 {// privatet队列名称 static final String QUEUE_NAME2 = "fanout_queueue2”;// privatete交换机的名称 static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {// 1、Connection获取连接对象 connection = ConnectionUtils.getConnection();// 2、创建通道(频道)Channel channel = connection.createChannel();// 3、声明交换机,如果没有创建EXCHANGE_NAME的交换机,则没有创建Chanel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);// 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)channel.queueDeclare(QUEUE_NAME2, true, false, false, null);// 5、绑定队列和交换机。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)channel.queueDeclare(QUEUE_NAME2, true, false, false, null);// 5、绑定队列和交换机。channel.queueBind(队列名, 交换机名, 路由key[fanout交换机routingkey设置为“”])channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");// 6、监听队列,Defaultconsumer接收消息 defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// String获取交换机信息 exchange = envelope.getExchange();// String获取消息信息 message = new String(body, "utf-8");System.out.println(“交换机名称:” + exchange + ",消费者获取消息: " + message);}};channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);// 注意,这里不建议消费者关闭资源,让程序一直处于读取消息的状态}}
  • 验证

首先,启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台上查看生产者发送的所有信息;实现广播效果。

消费者1

深入学习RabbitMQ五种模式(二)_发送消息_09

消费者2

深入学习RabbitMQ五种模式(二)_发布订阅_10

执行测试代码后,在RabitMQ管理后台找到Exchanges选项卡,点击fanout_exchange交换机,可查看以下绑定:

深入学习RabbitMQ五种模式(二)_发送消息_11

深入学习RabbitMQ五种模式(二)_java_12

fanout_exchange是代码中定义的交换机的名称;fanout_queue1fanout_queue2消费者1和消费者2在代码中定义的两个队列的名称

  • 总结

发布订阅模式引入了交换机的概念,因此它比以前的类型更灵活、更广泛。该模式需要设置一个类型为fanout的交换机,并绑定交换机和队列。当信息发送到交换机时,交换机会将信息发送到所有绑定的队列,最终被监控队列的消费者接收和消费。发布订阅模式也可以称为广播模式,不需要RoutingKey的判断。

发布订阅模式与工作队列模式的区别:

  1. 工作队列模式不需要定义交换机,而发布/订阅模式需要定义交换机。
  2. 发布/订阅模式的制造商向交换机发送信息,工作队列模式的制造商向队列发送信息(底部使用默认交换机)。
  3. 发布/订阅模式需要设置队列和交换机绑定,不需要设置工作队列模式。事实上,工作队列模式将队列绑定到默认交换机 。

上一篇 深入学习RabbitMQ五种模式(一)
下一篇 深入学习RabbitMQ五种模式(三)

文章素材均来源于网络,如有侵权,请联系管理员删除。