Springboot集成RabbitMQ死信队列的实现
作者:小伙子你那什么车啊 发布时间:2022-08-24 13:10:54
关于死信队列
在大多数的MQ中间件中,都有死信队列的概念。死信队列同其他的队列一样都是普通的队列。在RabbitMQ中并没有特定的“死信队列”类型,而是通过配置,将其实现。
当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange。当消息“死掉”后,会被自动路由到DLX Exchange的queue中。
什么样的消息会进入死信队列?
1.消息的TTL过期。
2.消费者对broker应答Nack,并且消息禁止重回队列。
3.Queue队列长度已达上限。
场景分析
以用户订单支付为场景。在各大电商平台上,订单的都有待支付时间,通常为30min。当用户超过30min未支付订单,该订单的状态应该会变成“超时取消”,或类似的状态值的改变。
如果不使用MQ,可以设计一个定时任务,定时查询数据库,判断订单的状态和支付时间是否已经到期,若到期则修改订单的状态。但显然,这不是一个很好的操作,频繁访问数据库,造成不必要的资源浪费。
使用MQ,我们可以在下单的时候,当订单数据入库后,发送一条Message到Queue中,并设置过期时间为30min或自定义的支付过期时间。
/**
* 发送带有过期时间的消息
*/
@GetMapping("/sendDlx")
public void sendDlx() {
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
JSON.toJSONString(order), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 模拟,设置10S后消息过期
message.getMessageProperties().setExpiration("10000");
return message;
});
}
若30min后,还未有消费者(下游服务)消费这条消息,那么该条消息就会被路由到死信队列中。我们可以设置一个监听去监听死信队列,当收到死信队列的消息后,则根据消息数据,查询数据库订单状态是否还是待支付状态,若是,则修改成超时取消。
代码实现
以下是demo,未做服务的拆分,因此整个流程都是单个服务实现的,所以就没有下游服务,但并不影响整体业务。
RabbitMQConfig
将需要的交换机,队列,绑定都声明成SpringBean。Spring会自动创建这些到RabbitMQ服务中。
@Value注解部分都是配置文件exchange、queue、routingKey的名称。
/**
* @author wulei
*/
@Configuration
public class RabbitConfig {
@Value("${sunspring.order.exchange}")
private String orderExchange;
@Value("${sunspring.order.queue}")
private String orderQueue;
@Value("${sunspring.order.routingKey}")
private String orderRoutingKey;
@Value("${sunspring.dlx.exchange}")
private String dlxExchange;
@Value("${sunspring.dlx.queue}")
private String dlxQueue;
@Value("${sunspring.dlx.routingKey}")
private String dlxRoutingKey;
/**
* 声明死信队列
* @return DirectExchange
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
/**
* 声明死信队列
* @return Queue
*/
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
/**
* 绑定死信队列到死信交换机
* @return Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with(dlxRoutingKey);
}
/**
* 声明订单业务交换机
* @return DirectExchange
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
/**
* 声明订单业务队列
* @return Queue
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 绑定该队列到私信交换机
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,arguments);
}
/**
* 绑定订单队列到订单交换机
* @return Binding
*/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with(orderRoutingKey);
}
}
sunspring.order.exchange=sunspring_order_exchange
sunspring.order.queue=sunspring_order_queue
sunspring.order.routingKey=sunspring.order
sunspring.dlx.exchange=sunspring_dlx_exchange
sunspring.dlx.queue=sunspring.dlx.queue
sunspring.dlx.routingKey=dlx
在声明业务队列时,创建了一个Map,并且put了两个值,这两个值就是死信队列的声明。
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:死信交换机的路由键,因为demo中两个交换机的类型都是direct的,因此路由键必须相同。
/**
* 声明订单业务队列
* @return Queue
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 绑定该队列到私信交换机
arguments.put("x-dead-letter-exchange",dlxExchange);
arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
return new Queue(orderQueue,true,false,false,arguments);
}
监控页面
在exchange列表中有刚刚创建的业务交换机sunspring_order_exchange和死信交换机
sunspring_dlx_exchange
在Queue列表中,有死信队列sunspring_dlx_queue和业务队列sunspring_order_queue
并且业务队列上有DLX标记,可见当前队列已经绑定了一个死信队列。DLK表示的路由键。
场景模拟
生产者
生产者发送了一个过期时间为10S的消息。
message.getMessageProperties().setExpiration(“10000”);
/**
* 发送带有过期时间的消息
*/
@GetMapping("/sendDlx")
public void sendDlx() {
Order order = new Order();
order.setItemId(1);
order.setStatus(1);
rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
JSON.toJSONString(order), message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setExpiration("10000");
return message;
});
}
sunspring_order_queue接受到了一条消息,当前消息的状态是ready的,表示没有任何消费者消费这条消息。
10s后,当前消息路由到了死信队列中,sunspring_order_queue消息数量变成0,sunspring_dlx_queue数量变成1。
消费者,设置死信队列监听
通过设置对死信队列的监听,可以发现,在Springboot启动之后,创建了对RabbitMQ的监听,死信队列的消息也立刻被消费了。
因此,我们可以监听死信队列,对未被消费的消息进行下一步操作。如场景分析中的更改订单状态。
@RabbitListener(queues = "sunspring.dlx.queue")
public void dlxListener(Message message,Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
//对消息进行业务处理....
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
2019-08-20 20:05:05.158 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [120.27.243.91:5672]
2019-08-20 20:05:05.224 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://guest@120.27.243.91:5672/, localPort= 13563]
{"itemId":1,"status":1}
来源:https://blog.csdn.net/shishishi777/article/details/99879419


猜你喜欢
- 百度是个好东西,这篇调用了百度的接口(当然大牛也可以自己写),人脸检测技术,所以使用的前提是有网的情况下。当然大家也可以去参考百度的文档。话
- Android XML設置屏幕方向(android:screenOrientation)详解注意:Android只支持270度旋
- 概述在Java环境下创建定时任务有多种方式:使用while循环配合 Thread.sleep(),虽然稍嫌粗陋但也勉强可用使用 Timer和
- 实践过程效果代码class BaseClass{ public class Win32
- 1.在res上面右键->New->Android resource directory2.点击之后,出现下图Resource t
- 本文实例讲述了Java实现的计时器【秒表】功能。分享给大家供大家参考,具体如下:应用名称:Java计时器用到的知识:Java GUI编程开发
- 前言在实际工作中,重试机制是一个很常见的场景,比如:发送消息失败,下载网络文件失败等…,因为这些错误可能是网络波动造成
- 单源最短路径问题,即在图中求出给定顶点到其它任一顶点的最短路径。在弄清楚如何求算单源最短路径问题之前,必须弄清楚最短路径的最优子
- 一、Jvm加载对象在说Java * 之前,还是要说一下Jvm加载对象的过程,这个依旧是理解 * 的基础性原理:Java类即源代码程序.j
- 开篇我们还是和原来一样,讲一讲做爬虫的思路以及需要准备的知识吧,高手们请直接忽略。首先我们来缕一缕思绪,想想到底要做什么,列个简单的需求。需
- idea的前后跳转箭头没有像eclipse一样显示在导航栏,很多时候找不到,记住快捷键:ctrl+alt+ 向左箭头或者向右箭头即可跳转。补
- 1.获取屏幕大小,以合理设定 按钮 大小及位置 DisplayMetrics dm = new DisplayMetrics(); getW
- 最近在读刘增辉老师所著的《MyBatis从入门到精通》一书,很有收获,于是将自己学习的过程以博客形式输出,如有错误,欢迎指正,如帮助到你,不
- 本文实例讲述了Android使用shape使组件呈现出特殊效果的方法。分享给大家供大家参考,具体如下:使用到的布局文件<?xml ve
- 先看下效果:两种需求场景:1.广告页3s后跳转到首页2.短信验证码60s倒计时第一种的话,根据需求我们可以知道,我们想要的效果就是3s结束做
- 问题引出:最近开了新项目,项目中用到了数据字典,列表查询数据返回的时候需要手动将code转换为name,到前台展示。项目经理表示可以封装一个
- Unsafe类是啥?Java最初被设计为一种安全的受控环境。尽管如此,Java HotSpot还是包含了一个“后门”,提供了一些可以直接操控
- 本文实例讲述了Android判断Activity是否在最上层的方法。分享给大家供大家参考,具体如下:private boolean isTo
- Cocos2d-x从2.x版本到上周刚刚才发布的Cocos2d-x 3.0 Final版,其引擎驱动核心依旧是一个单线程的“死循环”,一旦某
- 本文实例讲述了Java判断两个日期相差天数的方法。分享给大家供大家参考。具体如下:import java.util.Calendar;pub