深入学习RabbitMQ五种模式(一)
2023-05-04 10:43:59
1.安装erlangg安装
下载otp_win64_25.3.exe
https://www.erlang.org/downloads
erlang安装完成后,需要配置erlang环境变量
ERLANG_HOME=E:\software\Erlang OTPPATH=%PATH%;%ERLANG_HOME%\bin;
2.安装RabbitMQ
下载rabbitmq-server-3.11.13.exe
https://www.rabbitmq.com/download.html
进入安装目录下的sbin目录,安装和运行服务
安装服务: rabbitmq-service.bat install删除服务:rabbitmq-service.bat remove启动服务:rabbitmq-service.bat start停止服务: rabbitmq-service.bat stop
安装管理插件
在浏览器端安装RabbitMQ管理插件,方便RabbitMQ管理
管理员身份打开cmd,进入E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\sbin
目录,运行
rabbitmq-plugins.bat enable rabbitmq_management
执行结果
重启RabbitMQ
成功启动,登录RabitMQ
访问地址http://127.0.0.1:15672/
;初始帐号和密码guest/guest
。
这种模式是一对一的模式,只有一个生产者Producer(用于制作新闻),一个队列Queue(用于存储新闻),一个消费者Consumer (用于接收信息)。
注:交换机也用于简单模式,默认交换机用于(AMQP default)。
- 创建项目
rabbitmq-learn
pom.xml引入以下依赖性
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.olive</groupId><artifactId>rabbitmq-learn</artifactId><version>0.0.1-SNAPSHOT</version><properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties><dependencies><!-- mq的依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- 日志处理 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies></project>
RabbitMQ正式提供amqp-client
Java客户端连接RabitMQ Server;仓库地址如下
https://github.com/rabbitmq/rabbitmq-java-client
- RabbitMQ连接的工具类
package com.olive;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * 封装连接工具类 */public class ConnectionUtils { public static Connection getConnection() throws Exception { // 1.定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置服务器地址 factory.setHost("127.0.0.1"); // 3.设置协议端口号 factory.setPort(5672); // 4.虚拟主机名称;默认为; / factory.setVirtualHost("/"); // 5.设置用户名 factory.setUsername("admin"); // 6.设置用户密码 factory.setPassword(admin123); // 7.创建连接 Connection connection = factory.newConnection(); return connection; }}
在RabitMQ管理后台创建admin用户;可使用默认guest用户。
- 创建生产者
生产者负责创建信息并将信息发送到指定的队列,简单分为五个步骤:
创建连接 ——> 创建通道 ——> 创建(声明)队列 ——> 发送消息 ——> 关闭资源
package com.olive;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;/** * 生产者(简单模式) */public class SimpleProducer { /**队列名*// private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明(创建)队列 /* * queue 参数1:声明通道中相应的队列名称 * durable 参数2:是否定义持久队列,mq重启后,队列仍在继续 * exclusive 参数3:对于true来说,只有一个消费者可以监控这个队列吗? * autoDelete 参数4:是否自动删除队列,如果true说没有消息,没有消费者连接,自动删除队列 * arguments 参数5:队列其他参数(额外配置) */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4.发送消息 /* * exchange 参数1:如果没有指定交换机的名称,则使用默认Default Exchange * routingKey 参数2:队列名称或routingKey,如果指定交换机是routingKey路由key,简单的模式可以传递队列名称 * props 参数3:信息配置信息 * body 参数4:要发送的消息内容 */ String msg = "Hello World RabbitMQ!!!!"; System.out.println(”生产者发送的消息:" + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); ///关闭资源 channel.close(); connection.close(); }}
- 创建消费者
消费者的实现与生产者的实现过程相似,但没有关闭渠道和连接,因为消费者必须随时等待可能的消息,大致分为以下三个步骤:
获取连接 ——> 创建通道 ——> 监控队列,接收信息
package com.olive;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;/** * 消费者(简单模式) */public class SimpleConsumer { /**队列名*// private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3. 创建队列Queue,如果没有一个叫simple_world的队列,就会创建这个队列,如果有,就不会创建. // 这里是可有可无的,但必须有这个队列发送消息,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、监控队列,接收信息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /* * handledelivery回调方法,收到消息后,该方法将自动执行 * consumerTag 参数1:消费者标识 * envelope 参数2:一些信息可以获得,比如交换机,路由key... * properties 参数3:配置信息 * body 参数4:读到的信息 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println“消费者获取消息:” + new String(body)); } }; /* * queue 参数1:队列名称 * autoAck 参数2:是否自动确认,true表示,在自动确认收到消息后,消息将自动从队列中删除。否则,需要手动ack消息 * callback 参数3:上面定义了回调对象 */ channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //请注意,消费者不建议在这里关闭资源,让程序始终处于读取消息的状态 }}
- 验证测试
操作生产者代码,表示向队列发送信息。
查看RabbitMQ控制台中的Queues内容
在RabbitMQ队列中启动消费者消费。
通过RabitMQ控制台查看Queues的内容;发现消息已被消费
简单模式的缺点:该模式是一对一的。一个生产者向一个队列发送信息,一个消费者从绑定队列中获取信息,因此耦合过高。如果多个消费者想要消费队列中的信息,就无法实现。