RabbitMQ死信机制实现延迟队列的实战
作者:wxd_1024 发布时间:2023-10-10 18:41:57
延迟队列
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
应用场景
三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果。实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列没有设置消费者,过了有效期后消息会重新路由到指定的的队列,有消费者去执行订单查询。
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)
Time To Live(TTL)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter(死信)RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
<!-- 将消息放入此队列里,此队列设置过期时间,不制造消费者让其过期,过期后变成死信,消息会放入指定的新队列里,实现消息的延迟消费 -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">${qrcode.expire.icbc}</value>
</entry>
<!--消息过期根据重新路由 -->
<entry key="x-dead-letter-exchange" value="directExchange"/>
<entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
</rabbit:queue-arguments>
</rabbit:queue>
B: 对消息进行单独设置,每条消息TTL可以不同。
<!-- 将消息放入此队列里,次队列设置过期时间,不制造消费者让其过期,过期后变成死信,消息会放入指定的新队列里,实现消息的延迟消费 -->
<rabbit:queue name="paycenter.scanpay.orderquery.delay.icbc" durable="true" auto-delete="false" exclusive="false" >
<rabbit:queue-arguments>
<!--消息过期根据重新路由 -->
<entry key="x-dead-letter-exchange" value="directExchange"/>
<entry key="x-dead-letter-routing-key" value="paycenter.scanpay.orderquery"/>
</rabbit:queue-arguments>
</rabbit:queue>
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
package com.emax.paycenter.mq.pruducer;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
private final Long ttl;
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(ttl.toString());
return message;
}
}
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:指定routing-key发送
队列出现dead letter的情况有:
消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。
注意一:ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。否则会报错。
注意二:消费者中,抛异常了没处理,会一直重复消费
注意三:下面的代码我模拟了1-10号消息,消息的内容里面是1-10。过期的时间是10-1秒。这里要注意,虽然10是第一个发送,但是它过期的时间最长。
过了10s以后,消费者开始收到数据,但是它是一次性收到如下结果:10、9 、8 、7 、6、5 、4 、3 、2 、1
Consumer第一个收到的还是10。10是第一个放进队列,但是它的过期时间最长。所以由此可见,即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。参考官方文档发现“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。
所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。(也可以考虑缓存队列,DelayQueue实现定时延迟执行任务,但是也有缺点:就是项目重启缓存里的数据就会丢失,DelayQueue的使用详见其他博文)
for(int i = 10; i>0; i-- ){
amqpTemplate.convertAndSend(mqMessage.getExchange(), mqMessage.getRoutingKey(), result, new ExpirationMessagePostProcessor(expireTime));
}
来源:https://blog.csdn.net/wxd_1024/article/details/83549967


猜你喜欢
- 在拿到一个 Stream 如何优雅将这个 Stream 保存到代码最优雅的方法应该是通过 CopyTo 或 CopyToAsync 的方法u
- springboot加载yml文件获不到值今天使用spring boot读取yml文件,这种多层嵌套的竟然无法读取到(value注解spri
- 一、前言我们在日常学习中,对一个java代码有问题,不知道jvm内部怎么进行解析的时候;有个伟大壮举就是反编译,这样就可以看到jvm内部怎么
- 背景最近再做一个需求,就是对站点的一些事件进行埋点,说白了就是记录用户的访问行为。那么这些数据怎么保存呢,人家点一下保存一下?显然不合适,肯
- C#实现的鼠标钩子,可以获取鼠标在屏幕中的坐标,记得要以管理员权限运行才行using System;using System.Collect
- 1.launch启动流程已知协程的启动方式之一是Globalscope.launch,那么Globalscope.launch的流程是怎样的
- 上一篇文章:Android 10 启动分析之Init篇 (一)在前文提到,init进程会在在Trigger 为init的Action中,启动
- 存储结构二叉树是一种特殊的树,给个结点最多有两个子节点,并且子节点有左右之分,并且兄弟,父亲,孩子可以很方便的通过编号得到1.在二叉树的第i
- 声明:下面的实例全部在linux下尝试,window下未尝试。有兴趣者可以试一下。文章针c初学者。c语言的强符号和弱符号是c初学者经常容易犯
- 在Springboot中默认的静态资源路径有:classpath:/METAINF/resources/,classpath:/resour
- 结束firefox的进程,一句代码就够了,如下:Runtime.getRuntime().exec("taskkill /F /I
- 本文实例为大家分享了java实现猜拳小游戏的具体代码,供大家参考,具体内容如下实现下图要求public class User {privat
- 一、循环语句的几种语法语法:for循环格式:for(初始化语句;条件判断;递进语句){循环体;}while循环格式:初始化语句;while(
- 在Excel中如果能够将具有多级明细的数据进行分组显示,可以清晰地展示数据表格的整体结构,使整个文档具有一定层次感。根据需要设置显示或者隐藏
- 一、关键字分类C语言一共多少个关键字呢?一般的书上,都是32个(包括本书),但是这个都是C90(C89)的标准。其实C99后又新增了5个关键
- 前言最近碰到了Mybatis一对多查询的场景,在这里总结对比下常见的两种实现方式。本文以常见的订单表和订单详情表来举例说明;数据库表准备订单
- 一些Java项目中在mybatis与spring整合中有MapperScannerConfigurer的使用,该类通过反向代理自动生成基于接
- spring-MVC实现简单的登录功能,供大家参考,具体内容如下今天我学习了spring-MVC实现简单的登录功能,本篇博客就讲解如何使用s
- 生命太短暂,不要去做一些根本没有人想要的东西。本文已被 https://www.yourbatman.cn 收录,里面一并有Spring技术
- 有了前几篇的基础,相信大家对于Struts2已经有了一个很不错的认识,本篇我将为大家介绍一些关于Action接收参数的三种形式,以及简单的表