RabbitMQ延迟队列及消息延迟推送实现详解
作者:海向 发布时间:2023-11-15 15:31:31
这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
应用场景
目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:
淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
12306 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。
在上面两种场景中,如果我们使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:
使用 redis 给订单设置过期时间,最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。
消息延迟推送的实现
在 RabbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列
在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件,可以下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载
首先我们创建交换机和消息队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange(){
//Map<String, Object> pros = new HashMap<>();
//设置交换机支持延迟消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue(){
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding(){
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}
我们在 Exchange 的声明中可以设置exchange.setDelayed(true)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。
//Map<String, Object> pros = new HashMap<>();
//设置交换机支持延迟消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延迟时间。
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//confirmCallback returnCallback 代码省略,请参照上一篇
public void sendLazy(Object message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("12345678909"+new Date());
//发送消息时指定 header 延迟时间
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}
我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于我们手动设置 header
message.getMessageProperties().setHeader("x-delay", "6000");
/**
* Set the x-delay header.
* @param delay the delay.
* @since 1.6
*/
public void setDelay(Integer delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
}
else {
this.headers.put(X_DELAY, delay);
}
}
消费端进行消费
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class MQReceiver {
@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException{
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
测试结果
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
@Autowired
private MQSender mqSender;
@Test
public void sendLazy() throws Exception {
String msg = "hello spring boot";
mqSender.sendLazy(msg + ":");
}
}
果然在 6 秒后收到了消息 lazy receive hello spring boot:
来源:https://www.cnblogs.com/haixiang/p/10966985.html


猜你喜欢
- Java 定时器在JAVA中实现定时器功能要用的二个类是Timer,TimerTaskTimer类是用来执行任务的类,它接受一个
- 本文实例为大家分享了OpenCV+Qt实现图像处理操作的具体代码,供大家参考,具体内容如下一、目标Qt界面实现 雪花屏 高斯模糊 中值滤波
- Aop什么是Aop?AOP就是面向切面编程,通过预编译方式以及运行期间的 * 技术来实现程序的统一维护功能。什么是切面,我理解的切面就是两
- 在搭建Spring Cloud Eureka环境前先要了解整个架构的组成,常用的基础模式如下图:服务提供者:将springboot服务编写好
- 前面在学习鸿洋大神的一些自定义的View文章,看到了自定义ViewGroup实现浮动标签,初步看了下他的思路以及结合自己的思路完成了自己的浮
- 【前言】AOP(Aspect Orient Programming),我们一般称为面向方面(切面)编程,作为面向对象的一种补充,用于处理系统
- 注意是maven的webapp:选择maven下一步下一步。maven下载过慢在setting中加入镜像。 我也有疑问这是什么鬼格式,但是证
- jmap是java自带的工具1. 查看整个JVM内存状态jmap -heap [pid]2. 查看JVM堆中对象详细占用情况jmap -hi
- SpringBoot控制配置类加载顺序@Configuration配置被加载进容器的方式大体上可分为3种手动:构建ApplicationCo
- 定义:当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知它的依赖对象。特
- 一、redis发布订阅简介Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。可以参
- 标识符Java 对各种变量、方法和类等要素命名时使用的字符序列称为标识符技巧:凡是自己可以起名字的地方都叫标识符定义合法标识符规则:由26个
- 本文实例讲述了Java简单实现约瑟夫环算法。分享给大家供大家参考,具体如下:1.算法背景:罗马人攻占了乔塔帕特,41人藏在一个山洞中躲过了这
- Http协议的重要性相信不用我多说了,HttpClient相比传统JDK自带的URLConnection,增加了易用性和灵活性(具体区别,日
- JenkinsJenkins是一个开源的、可扩展的持续集成、交付、部署的基于web界面的平台。允许持续集成和持续交付项目,无论用的是什么平台
- 一,引入dll1.ServiceStack.Common.dll2.ServiceStack.Interfaces.dll3.Service
- java.lang.NoSuchMethodException: com.sun.proxy.$Proxy58.list错误解决办法玩web
- 本文实例讲述了C#适用于like语句的SQL格式化函数,分享给大家供大家参考。具体实现代码如下:/// <summary> //
- Spring Aop的原理Spring的AOP就是通过 * 实现的。当为某个Bean或者某些Bean配置切面时,Spring会为其创建代理
- pom.xml配置<dependency> <groupId>org.springframework.