关于SpringBoot整合RabbitMQ实现死信队列
作者:叫我二蛋 发布时间:2022-03-29 23:27:05
概念介绍
什么是死信
死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:
消费者使用basic.reject或basic.nack(重新排队参数设置为false)对消息进行否定确认。
消息到达生存时间还未被消费。
队列超过长度限制,消息被丢弃。
这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图
死信队列应用
作为消息可靠性的一个扩展。比如,在队列已满的情况下也不会丢失消息。
可以实现延迟消费功能。比如,订单15分钟内未支付。
注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:
维护多个队列,每个队列维护一个TTL时间。
使用延迟交换机。这种方式需要下载插件支持
工程搭建
环境说明
RabbitMQ环境
Java版本:JDK1.8
Maven版本:apache-maven-3.6.3
开发工具:IntelliJ IDEA
搭建步骤
1.创建SpringBoot项目。
2.pom.xml文件导入RabbitMQ依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.application.yml文件添加RabbitMQ配置。
spring:
# rabbitmq配置信息 RabbitProperties类
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
# 开启confirm机制
publisher-confirm-type: correlated
# 开启return机制
publisher-returns: true
#全局配置,局部配置存在就以局部为准
listener:
simple:
acknowledge-mode: manual # 手动ACK
实现死信
准备Exchange&Queue
@Configuration
public class RabbitMQConfig {
/**
* 正常队列
*/
public static final String EXCHANGE = "boot-exchange";
public static final String QUEUE = "boot-queue";
public static final String ROUTING_KEY = "boot-rout";
/**
* 死信队列
*/
public static final String DEAD_EXCHANGE = "dead-exchange";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead-rout";
/**
* 声明死信交换机
*
* @return
*/
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
}
/**
* 声明死信队列
*
* @return
*/
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
/**
* 绑定死信的队列和交换机
*
* @param deadExchange
* @param deadQueue
* @return
*/
@Bean
public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
/**
* 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
*
* @return
*/
@Bean
public Exchange bootExchange() {
return ExchangeBuilder.directExchange(EXCHANGE).build();
}
/**
* 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
* 绑定死信交换机及路由key
*
* @return
*/
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//声明队列属性有更改时需要删除队列
//给队列设置消息时长
//.ttl(10000)
//队列最大长度
.maxLength(1)
.build();
}
/**
* 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
*
* @param bootExchange
* @param bootQueue
* @return
*/
@Bean
public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
}
}
监听死信队列
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
public void listener_dead(String msg, Channel channel, Message message) throws IOException {
System.out.println("死信接收到消息" + msg);
System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
System.out.println("messageID:" + message.getMessageProperties().getMessageId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
方式一、消费者拒绝&否认
拒绝消息
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void listener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息" + msg);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
}
否认消息
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void listener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息" + msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
方式二、超过消息TTL 发送消息时设置TTL
@SpringBootTest
public class Publisher {
@Autowired
private RabbitTemplate template;
/**
* 5秒未被消费会路由到死信队列
*/
@Test
public void publish_expir() {
template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
message.getMessageProperties().setExpiration("5000");
return message;
});
}
}
设置队列所有消息的TTL
更新RabbitMQConfig类中bootQueue() ,更新后需要删除队列,因为队列属性有更改。
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//声明队列属性有更改时需要删除队列
//给队列设置消息时长
.ttl(10000)
.build();
}
方式三、超过队列长度限制
设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//声明队列属性有更改时需要删除队列
.maxLength(1)
.build();
}
代码仓库 点我
来源:https://wangbinguang.blog.csdn.net/article/details/128304529


猜你喜欢
- 因为目前工程无法使用第三方,只能搞一个 * 缓存了 * 缓存分为内存缓存,本地缓存,网络缓存;缓存的步骤依次是网络,内存,本地,然后取的顺序为内
- 一些公共的模板############################################### 对于一些基本指令的添加####
- Q 1: 指定 SYSTEM_UI_FLAG_LAYOUT_HIDE_NAVIGATION 好像并不能隐藏导航栏和状态栏?A: 您需要告诉系
- 本文实例为大家分享了C#支付宝扫码支付示的具体代码,供大家参考,具体内容如下支付宝工具类using System; using System
- 1.切所有controller下的请求项目结构任意公共方法的执行:execution(public * (…))##public可以省略,
- 前言表单提交是最常见的数据提交方式,我们经常会填写表单信息,比如用户名,身份证,手机号等等,因此就会产生身份证是否合法,用户名是否为空,虽然
- 前言在windows平台下实现高性能网络服务器,iocp(完成端口)是唯一选择。编写网络服务器面临的问题有:1 快速接收客户端的连接。2 快
- /// <summary>/// 获取数据缓存/// </summary>/// <param name=&q
- Service概念及用途:Android中的服务,它与Activity不同,它是不能与用户交互的,不能自己启动的,运行在后台的程序,如果我们
- 最初的源文件样式如下:用默认的配置进行格式化之后如下:使用如下配置后,格式化之后的代码如下:最终修改成下面这样比较合适:来源:https:/
- 什么是雪花算法雪花算法的本质为生成一个64位长度的具有自增性的分布式全局唯一id。在64bits中,会对不同段的位进行划分。可分为:符号段时
- 气球状提示框的介绍和系统通知变化NotifyIcon控件表示系统右下角任务栏上的托盘图标,其ShowBalloonTip方法用于显示任务栏中
- 本文实例讲述了Android string.xml中的替换方法。分享给大家供大家参考,具体如下:在android的开发中,经常会遇见一句话,
- 记得上学的时候学习英语,每个英语老师说到英语翻译的时候都会说英语翻译要做到“信、达、雅”。如今做了一名程序员竟然体会我还是想用这三种境界来要
- C# 中的委托(Delegate)类似于 C 或 C++ 中函数的指针。委托(Delegate) 是存有对某个方法的引用的一种引用类型变量。
- 一.NET Remoting 介绍简介.NET Remoting与MSMQ不同,它不支持离线可得,另外只适合.NET平台的程序进行通信。它提
- 本文实例讲述了C#使用RichTextBox实现替换文字及改变字体颜色功能。分享给大家供大家参考,具体如下:替换文字private void
- Redis是一个缓存消息中间件及具有丰富特性的键值存储系统。Spring Boot为Jedis客户端库和由Spring Data Redis
- 简介相信大家有配置过多数据源,或者即将配置多数据的朋友们,会发现网上大概有以下几种方案:1. 使用 AOP 切片进行动态数据源切换2. 使用
- 这篇文章主要介绍了springmvc视图解析流程代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的