首页 > 图灵资讯 > 技术篇>正文

深入学习RabbitMQ五种模式(三)

2023-05-04 10:44:36

 

1.路由模式(精确匹配)

路由模式(Routing)的特点:

  • 该模式的交换机为direct,即定向发送和精确匹配。
  • 队列和交换机的绑定不能随意绑定,而是指定RoutingKey(路由Key)
  • 当消息发送方向Exchange发送消息时,还必须指定消息 RoutingKey。
  • Exchange不再将消息交给每个绑定的队列,而是根据消息的RoutingKey来判断,只有队列的Routingkey和消息的Routing 只有key完全一致,才会收到消息。

制造商将消息发送到direct交换器,制造商在发送消息时指定路由key,并在绑定队列和交换器时指定路由key,因此消息只发送到相应的routing key队列相同,然后监控消费者消费消息。模型如下图所示:

深入学习RabbitMQ五种模式(三)_发送消息

  • 创建生产者
package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(路由模式) */public class RoutorProducer {    // 交换机名称    private static final String EXCHANGE_NAME = "routing_exchange";    public static void main(String[] args) throws Exception {        // 1、创建连接        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、发送消息,连续发3条        for (int i = 0; i < 3; i++) {            String routingKey = "";            ///发送消息时,根据相关逻辑指定相应的routing key。            switch (i) {                case 0:  //假设i=0,为error消息                    routingKey = "error";                    break;                case 1: //假设i=1,为info新闻服务                    routingKey = "info";                    break;                case 2: //假设i=2,warnining消息                    routingKey = "warning";                    break;            }            // 要发送的消息            String message = "Hello World Message!!!!~~~" + routingKey;            // 消息发送 channel.basicPublish(交换机名称,路由key,其他属性的消息,内容)            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));            System.out.println(”生产者发送的消息:" + message);        }        ///释放资源        channel.close();        connection.close();    }}
  • 创建消费者

消费者1

package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者1(路由模式) */public class Routorcunsumer {    // 队列名称    private static final String QUEUE_NAME1 = "routing_queueue1";    // 交换机名称    private static final String EXCHANGE_NAME = "routing_exchange";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、声明交换机(如果有,则不会创建,无则创建) channel.exchangeDeclare(交换机名称,交换机类型,是否持久)        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);        // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);        // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "error");        // 6、监听队列,接收消息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                ////key获取路由                String routingKey = envelope.getRoutingKey();                //获取交换机信息                String exchange = envelope.getExchange();                //获取消息信息                String message = new String(body, "utf-8");                System.out.println(路由Key:" + routingKey + ", 交换机名称:“ + exchange + ", 消费者获取消息: " + message);            }        };        channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);        //请注意,消费者不建议在这里关闭资源,让程序始终处于读取消息的状态    }}

消费者2

package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者2(路由模式) */public class Routorcunsumer2 {    // 队列名称    private static final String QUEUE_NAME2 = "routing_queueue2”;    // 交换机名称    private static final String EXCHANGE_NAME = "routing_exchange";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名称,交换机类型,是否持久)        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);        // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);        // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "error");        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "info");        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning");        // 6、监听队列,接收消息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                ////key获取路由                String routingKey = envelope.getRoutingKey();                //获取交换机信息                String exchange = envelope.getExchange();                //获取消息信息                String message = new String(body, "utf-8");                System.out.println(路由Key:" + routingKey + ", 交换机名称:“ + exchange + ", 消费者获取消息: " + message);            }        };        channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);    }}
  • 验证

先启动所有消费者,再启动生产者发送消息;在消费者对应的控制台上,可以看到生产者发送相应的routing key对应队列的消息;达到需要接收的效果。

消费者1绑定的交换机和队列的路由Key是error,所以只要生产者在发送消息时带有error的routingKey,就可以获得消息。

深入学习RabbitMQ五种模式(三)_spring_02

Key是消费者2绑定的交换机和队列的error、info、warning,因此,只要生产者在发送消息时带有这三种routingKey,它就能得到消息。

深入学习RabbitMQ五种模式(三)_java_03

相应的交换机也可以从RabitMQ管理的后台看到,以及队列绑定

深入学习RabbitMQ五种模式(三)_java_04

  • 总结
  1. Routing模式需要将交换机设置为Direct类型。
  2. Routing模式要求队列在绑定交换机时指定routing key,将信息发送到交换机后,交换机会根据routing发送 key将消息发送到相应的队列。
2.Topic模式(模糊匹配)

与Direct相比,Topic类型可以根据RoutingKey将信息路由到不同的队列。但是Topic类型的Exchange可以让队列绑定到Routing key使用通配符进行匹配,即模糊匹配,比以前的模式更灵活!

以Topic为主题的Routingkey通常由一个或多个单词组成,多个单词之间有“.“分割,例如: log.insert ,其通配符规则如下:

*:匹配不多或多或少只是一个单词

#:匹配0或多个单词

简单举例:

log.*:只能匹配log.error,log.info等log.#:能够匹配log.insert,log.insert.abc,log.news.update.abc等

 

深入学习RabbitMQ五种模式(三)_rabbitmq_05

深入学习RabbitMQ五种模式(三)_java_06

图解:

  1. 红色Queue:usa是绑定的.# ,因此凡是以usa.routing开头 所有的key都会匹配
  2. 黄色Queue:绑定的是#.news ,因此凡是以.news结尾的 routing 所有的key都会匹配
  • 创建生产者
package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(Topic主题模式) */public class TopicProducer {    // 交换机名称    private static final String EXCHANGE_NAME = "topic_exchange";    public static void main(String[] args) throws Exception {        // 1、创建连接        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、发送消息        for (int i = 0; i < 4; i++) {            String routingKey = "";            ///发送消息时,根据相关逻辑指定相应的routing key。            switch (i) {                case 0:  //假设i=0,为select新闻                    routingKey = "log.select";                    break;                case 1: //假设i=1,为info新闻服务                    routingKey = "log.delete";                    break;                case 2: //假设i=2,为log.news.add消息                    routingKey = "log.news.add";                    break;                case 3: //假设i=3,为log.news.update消息                    routingKey = "log.news.update";                    break;            }            // 要发送的消息            String message = "Hello Message!!!!~~~" + routingKey;            // 消息发送 channel.basicPublish(交换机名称,路由key,其他属性的消息,内容)            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));            System.out.println(”生产者发送的消息:" + message);        }        // 关闭资源        channel.close();        connection.close();    }}
  • 创建消费者

消费者1

接收所有与log.*相匹配的路由key队列中的消息

package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者(Topic模式) */public class Topiconsumer {    // 队列名称    private static final String QUEUE_NAME1 = "topic_queueue1";    // 交换机名称    private static final String EXCHANGE_NAME = "topic_exchange";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、声明交换机(如果有,则不会创建,无则创建) channel.exchangeDeclare(交换机名称,交换机类型,是否持久)        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);        // 4、声明队列Queue channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);        // 5、根据指定的routingKey绑定队列和交换机,设置路由Key channel.queueBind(队列名, 交换机名, 路由key)        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "log.*");        // 6、监听队列,接收消息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                ////key获取路由                String routingKey = envelope.getRoutingKey();                //获取交换机信息                String exchange = envelope.getExchange();                //获取消息信息                String message = new String(body, "utf-8");                System.out.println(路由Key:" + routingKey + ", 交换机名称:“ + exchange + ", 消费者获取消息: " + message);            }        };        channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);        //请注意,消费者不建议在这里关闭资源,让程序始终处于读取消息的状态    }}

消费者2

接收所有与`log.``#相匹配的路由key队列中的消息

package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者(Topic模式) */public class Topiconsumer {    // 队列名称    private static final String QUEUE_NAME2 = "topic_queueue2”;    // 交换机名称    private static final String EXCHANGE_NAME = "topic_exchange";    public static void main(String[] args) throws Exception {        // 1、获取连接对象        Connection connection = ConnectionUtils.getConnection();        // 2、创建通道(频道)        Channel channel = connection.createChannel();        // 3、声明交换机(如果有,则不会创建,无则创建) channel.exchangeDeclare(交换机名称,交换机类型,是否持久)        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);        // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久,是否独家连接,是否自动删除,附加参数)        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);        // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "log.#");        // 6、监听队列,接收消息        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                //key获取路由                String routingKey = envelope.getRoutingKey();                //获取交换机信息                String exchange = envelope.getExchange();                //获取消息信息                String message = new String(body, "utf-8");                System.out.println(路由Key:" + routingKey + ", 交换机名称:“ + exchange + ", 消费者获取消息: " + message);            }        };        channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);    }}

首先启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台上,可以看到生产者发送相应的routing key对应队列的消息;达到需要接收的效果。

消费者1的路由key匹配规则是log.*,绑定所有路由规则的队列应只有两条信息。

深入学习RabbitMQ五种模式(三)_rabbitmq_07

消费者2的路由key匹配规则是log.#,可与log相匹配.开头所有路由key,所有绑定路由规则的队列应该只有4条信息。

深入学习RabbitMQ五种模式(三)_发送消息_08

相应的交换机也可以从RabitMQ管理的后台看到,以及队列绑定

深入学习RabbitMQ五种模式(三)_java_09

  • 总结

Topic主题模式需要设置类型为Topic的交换机,绑定交换机和队列,并指定routing的通配符模式 key,将信息发送到交换机后,交换机会根据routing发送 key将消息发送到相应的队列。

Publish/Subscribe发布和订阅模式可以实现Topic主题模式 以及Routing路由模式的功能;只有Topic配置routing key 通配符可以使用,所以更灵活。

上一篇 深入学习RabbitMQ五种模式(二)
下一篇 SpringBoot RabbitMQ死信队列

文章素材均来源于网络,如有侵权,请联系管理员删除。