RocketMQ-延迟消息的处理流程介绍
作者:pigcoffee 发布时间:2023-07-28 01:58:59
标签:RocketMQ,延迟,消息
概述
RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。
Broker处理延迟消息
CommitLog.putMessage()
//获取消息的sysflag
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//非事务消息 或 已commit事务消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 判断消息是否设置延迟
if (msg.getDelayTimeLevel() > 0) {
//判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//延迟消息的topic为 SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//获取延迟级别,一个延迟级别对应一个Queue
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//消息原始的topic,queueid保存到消息的property中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息
2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息
3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX
5、获取延迟级别,一个延迟级别对应一个Queue
6、消息原始的topic,queueid保存到消息的property中
7、修改消息的topci、queueid
启动延迟消息定时任务
ScheduleMessageService.start()
延迟消息投递
来源:https://blog.csdn.net/hqwang4/article/details/100030974


猜你喜欢
- 骑士周游问题在8x8的国际棋盘上,按照马走日的规则,验证是否能够走遍棋盘。解题思路1、创建棋盘 chessBoard,是一个二维数组。2、将
- 如何解决yml没有spring小叶子标志我的idea springboot项目中有两个.yml文件,一个application.yml,一个
- 本文实例讲述了C#中实现一次执行多条带GO的sql语句。分享给大家供大家参考。具体如下:using System;using System.
- 废话少说,直接上代码:ehcache.xml 文件<?xml version="1.0" encoding=&qu
- 在没介绍正文之前,先给大家介绍下websocket的背景和原理:背景在浏览器中通过http仅能实现单向的通信,comet可以一定程度上模拟双
- 采取的方法是Fragment+FragmentTabHost组件来实现这种常见的app主页面的效果首先给出main.xml文件
- 本文实例讲述了C#遍历系统进程的方法。分享给大家供大家参考。具体实现方法如下:建立一个listBox将进程名称遍历进去this.listBo
- 1、研究背景 在当今信息社会发展中中,计算机科
- Java的NIO包中,有一个专门用于发送UDP数据包的类:DatagramChannel,UDP是一种无连接的网络协议,一般用于发送一些准确
- 这篇文章主要介绍了Java CPU性能分析工具代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋
- 1、springboot controller 单例Spring中 controller默认是单例的,因为单例所以不是线程安全的。所以需要注
- C# 关于Invoke首先说下,invoke和begininvoke的使用有两种情况:control中的invoke、begininvoke
- 以在搜索框搜索时,自动补全为例:其中还涉及到一个词,Tokenizer:分词器,分解器。上效果图:MainActivity.java:pac
- 如今代码圈很多做网络爬虫的例子,今天小编给大家分享的是如何用C#做网络爬虫。注意这次的分享只是分享思路,并不是一整个例子,因为如果要讲解一整
- 前言传统的Restful API 存在诸多的问题,首先它无法控制返回的字段,前端也无法预判后端的返回结果,另外不同的返回结果对应不同的请求地
- ProgressBar进度条,分为旋转进度条和水平进度条,进度条的样式根据需要自定义,之前一直不明白进度条如何在实际项目中使用,网上演示进度
- 像360卫士的波浪球进度的效果,一般最常用的方法就是画线的方式,先绘sin线或贝塞尔曲线,然后从左到右绘制竖线,然后再裁剪圆区域。今天我这用
- 一,概念1,排序排序,就是使一串记录,按照其中的某个或某些关键字的大小,递增或递减的排列起来的操作。 平时的上下文中,如果提到排序,通常指的
- 对谷歌地图操作使用的是WebBrowser控件,通过对javascript的操作来实现对谷歌地图的各种操作,所以首先要创建一个html文件,
- name hobbyTom &nb