SpringBoot集成RabbitMQ的方法(死信队列)
作者:小揪揪 发布时间:2023-06-10 15:12:06
标签:SpringBoot,集成,RabbitMQ
介绍
死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期
场景
1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理
使用
安装MQ
使用docker方式安装,选择带mangement的版本
docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
访问 localhost: 15672,默认账号密码guest/guest
项目配置
(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
(3)队列配置
package com.df.ps.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MqConfig {
//time
@Value("${spring.df.buffered.min:120}")
private int springdfBufferedTime;
@Value("${spring.df.high-buffered.min:5}")
private int springdfHighBufferedTime;
@Value("${spring.df.low-buffered.min:120}")
private int springdfLowBufferedTime;
// 30min Buffered Queue
@Value("${spring.df.queue:spring-df-buffered-queue}")
private String springdfBufferedQueue;
@Value("${spring.df.topic:spring-df-buffered-topic}")
private String springdfBufferedTopic;
@Value("${spring.df.route:spring-df-buffered-route}")
private String springdfBufferedRouteKey;
// 5M Buffered Queue
@Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
private String springdfHighBufferedQueue;
@Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
private String springdfHighBufferedTopic;
@Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
private String springdfHighBufferedRouteKey;
// High Queue
@Value("${spring.df.high.queue:spring-df-high-queue}")
private String springdfHighQueue;
@Value("${spring.df.high.topic:spring-df-high-topic}")
private String springdfHighTopic;
@Value("${spring.df.high.route:spring-df-high-route}")
private String springdfHighRouteKey;
// 2H Low Buffered Queue
@Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
private String springdfLowBufferedQueue;
@Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
private String springdfLowBufferedTopic;
@Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
private String springdfLowBufferedRouteKey;
// Low Queue
@Value("${spring.df.low.queue:spring-df-low-queue}")
private String springdfLowQueue;
@Value("${spring.df.low.topic:spring-df-low-topic}")
private String springdfLowTopic;
@Value("${spring.df.low.route:spring-df-low-route}")
private String springdfLowRouteKey;
@Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
Queue springdfBufferedQueue() {
int bufferedTime = 1000 * 60 * springdfBufferedTime;
return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
Queue springdfHighBufferedQueue() {
int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
Queue springdfHighQueue() {
return new Queue(springdfHighQueue, true);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
Queue springdfLowBufferedQueue() {
int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
Queue springdfLowQueue() {
return new Queue(springdfLowQueue, true);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
TopicExchange springdfBufferedTopic() {
return new TopicExchange(springdfBufferedTopic);
}
@Bean
Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
TopicExchange springdfHighBufferedTopic() {
return new TopicExchange(springdfHighBufferedTopic);
}
@Bean
Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
TopicExchange springdfHighTopic() {
return new TopicExchange(springdfHighTopic);
}
@Bean
Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
TopicExchange springdfLowBufferedTopic() {
return new TopicExchange(springdfLowBufferedTopic);
}
@Bean
Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
}
@Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
TopicExchange springdfLowTopic() {
return new TopicExchange(springdfLowTopic);
}
@Bean
Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(springdfHighQueue, springdfLowQueue);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {
MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
adapter.setDefaultListenerMethod("receive");
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
return adapter;
}
private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", topic);
args.put("x-dead-letter-routing-key", routeKey);
args.put("x-message-ttl", bufferedTime);
// 是否持久化
boolean durable = true;
// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = false;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = false;
return new Queue(queueName, durable, exclusive, autoDelete, args);
}
}
消费者配置
package com.df.ps.mq;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import java.util.Map;
public class MqReceiver {
private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);
@Value("${high-retry:5}")
private int highRetry;
@Value("${low-retry:5}")
private int lowRetry;
@Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
private String springdfHighBufferedTopic;
@Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
private String springdfHighBufferedRouteKey;
@Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
private String springdfLowBufferedTopic;
@Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
private String springdfLowBufferedRouteKey;
private final RabbitTemplate rabbitTemplate;
@Autowired
public MqReceiver(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void receive(Object message) {
if (logger.isInfoEnabled()) {
logger.info("default receiver: " + message);
}
}
/**
* 消息从初始队列进入5分钟的高速缓冲队列
* @param message
*/
public void highReceiver(Object message){
ObjectMapper mapper = new ObjectMapper();
Map msg = mapper.convertValue(message, Map.class);
try{
logger.info("这里做消息处理...");
}catch (Exception e){
int times = msg.get("times") == null ? 0 : (int) msg.get("times");
if (times < highRetry) {
msg.put("times", times + 1);
rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);
} else {
msg.put("times", 0);
rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
}
}
}
/**
* 消息从5分钟缓冲队列进入2小时缓冲队列
* @param message
*/
public void lowReceiver(Object message){
ObjectMapper mapper = new ObjectMapper();
Map msg = mapper.convertValue(message, Map.class);
try {
logger.info("这里做消息处理...");
}catch (Exception e){
int times = msg.get("times") == null ? 0 : (int) msg.get("times");
if (times < lowRetry) {
rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);
}else{
logger.info("消息无法被消费...");
}
}
}
}
来源:https://segmentfault.com/a/1190000019042102


猜你喜欢
- 使用对象初始值设定项初始化对象可以使用对象初始值设定项以声明方式初始化类型对象,而无需显式调用类型的构造函数。下面的示例演示如何将对象初始值
- http interface从 Spring 6 和 Spring Boot 3 开始,Spring 框架支持将远程 HTTP 服务代理成带
- 前言本文主要给大家介绍了关于JDK8新增的原子性操作类LongAdder的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的
- 根据上下文环境,Java 的关键字 final 的含义有些微的不同,但通常它指的是“这是不能被改变的”。防止改变有两个原因:设计或效率。因为
- 1.概述数据库开发一直是JAVA开发的核心之一,作为现在JAVA EE的基石框架,Spring Boot自身携带了一个JDBCTemplat
- 日期格式化标准 DateTime 格式字符串如果格式字符串只包含下表列出的某个单个格式说明符,则它们被解释为标准格式说明符。如果指定的格式字
- 一:串口通信简介前段时间因为工作需要研究了一下android的串口通信,网上有很多讲串口通信的文章,我在做的时候也参考了很多文章,现在就将我
- Android ScrollView 下嵌套 ListView 或 GridView出现问题解决办法ScrollView 下嵌套 ListV
- android欢迎页自动跳转及手机网络检测,具体实现代码如下所示:在1.2s后从欢迎页(welcome)跳转到登陆页(Login_Activ
- 根据使用泛型位置的不同可以分为:声明侧泛型、使用侧泛型。声明侧的泛型信息被记录在Class文件的Constant pool中以Signatu
- 一、创建项目1.File->new->project;2.选择“Spring Initializr”,点击next;(jdk1.
- Mybatis属于半自动ORM,在使用这个框架中,工作量最大的就是书写Mapping的映射文件,由于手动书写很容易出错,我们可以利用Myba
- 今天本文与大家分享如何得到数组中的最大值和最小值的实例。很适合Java初学者复习数组的基本用法与流程控制语句的使用。具体如下:这个程序主要是
- 前言图形相交检测常常用在伤害判定,使用自定义的图形相交检测,可以在一定程度上控制性能。比如2D格斗游戏中使用的矩形包围盒(AABB),一些动
- Scala异常处理Scala是一种多范式的编程语言,支持面向对象和函数式编程。Scala也支持异常处理,即在程序运行过程中发生意外或错误时,
- 前言;Maven 作为经典的项目构建工具相信很多人已经用很久了,但如果体验过 Gradle,那感觉只有两个字“真香&am
- 本文实例为大家分享了java验证码生成的具体代码,供大家参考,具体内容如下简单验证码java实现--servlet类生成 验证码img,并写
- private void button1_Click(object sender, EventArgs e) {
- 这里使用Spring Boot 2.7.4版本,对应Spring Security 5.7.3版本本文样例代码地址: spring-secu
- 原理简介:zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,