首页 > 图灵资讯 > 技术篇>正文

RabbitMQ 实现消息队列延迟

2023-05-04 10:45:00

1.概述

为了实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange但是RabitMQ版本必须是3.5.8以上支持插件,否则必须使用其死信队列功能。

2.安装RabbitMQ延迟插件
  • 检查插件的使用情况rabbitmq-plugins list用于查看RabbitMQ安装的插件。
rabbitmq-plugins list

检查RabbitMQ插件的安装情况

RabbitMQ 实现消息队列延迟_rabbitmq

  • 下载插件

如果没有安装插件,直接访问官网下载

https://www.rabbitmq.com/community-plugins.html

 

RabbitMQ 实现消息队列延迟_rabbitmq_02

RabbitMQ 实现消息队列延迟_java_03

  • 安装插件

下载后,将其复制到RabitMQ安装目录的plugins目录;并解压,如:

E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins

 

RabbitMQ 实现消息队列延迟_rabbitmq_04

打开cmd命令行窗口。如果系统配备了RabitMQ环境变量,则直接执行以下命令进行安装;否则,您需要进入RabitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

RabbitMQ 实现消息队列延迟_spring_05

3.实现RabitMQ消息队列延迟功能
  • pom.在xml配置信息文件中添加相关依赖文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.olive</groupId><artifactId>rabbitmq-spring-demo</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</version><relativePath /></parent><dependencies><!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency>    <groupId>org.eclipse.paho</groupId>    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>    <version>1.2.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
  • application.RabitMQ信息配置在yml配置文件中
server:  port: 8080spring:  #给项目起个名字  application:    name: rabbitmq-spring-demo  #rabitMq配置 服务器  rabbitmq:    host: 127.0.0.1    port: 5672    username: admin    password: admin123    #虚拟host。使用server默认hosttt可以不设置;不同虚拟路径下的队列是隔离的    virtual-host: /
  • RabitMQ配置类别
package com.olive.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * RabitMQ配置类别 **/@Configurationpublic class RabbitMqConfig {public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";public static final String DELAY_QUEUE_NAME = "delay_queue_name";public static final String DELAY_ROUTING_KEY = "delay_routing_key";@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Queue queue() {Queue queue = new Queue(DELAY_QUEUE_NAME, true);return queue;}@Beanpublic Binding binding(Queue queue, CustomExchange delayExchange) {return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}}
  • 发送消息

实现消息发送,设置消息延迟5s。

package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.olive.config.RabbitMqConfig;/** * 消息发送者 **/@Servicepublic class CustomMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送时间:" + sdf.format(new Date()));rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_ROUTING_KEY, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息延迟5秒message.getMessageProperties().setHeader("x-delay", 5000);return message;}});}}
  • 接收消息
package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.olive.config.RabbitMqConfig;/** * 消息接收者 **/@Componentpublic class CustomMessageReceiver {@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)public void receive(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(sdf.format(new Date()) + msg);System.out.println("Receiver:执行取消订单”);}}
  • 测试验证
package com.olive.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import com.olive.service.CustomMessageSender;@RestControllerpublic class DelayMessageController {@Autowiredprivate CustomMessageSender customMessageSender;@GetMapping("/sendMessage")public String sendMessage() {// customessagesender发送消息.sendMsg(”你已经支付了超时费,取消订单通知!");return "success";}}

发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

RabbitMQ 实现消息队列延迟_spring_06

上一篇 SpringBoot RabbitMQ死信队列
下一篇 SpringCloud Stream集成RabbitMQ

文章素材均来源于网络,如有侵权,请联系管理员删除。