RocketMQ特性Broker存储事务消息实现
作者:奔跑的毛球 发布时间:2022-07-10 20:42:13
引言
在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()
方法执行的。
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
这里有三个核心的初始化变量
TransactionalMessageService
事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl
也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()
方法进行加载。
TransactionalMessageService
类定义如下。内部属性已加注释标明。
public interface TransactionalMessageService {
//用于保存Half事务消息
PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
//删除事务消息
boolean deletePrepareMessage(MessageExt messageExt);
//提交事务消息
OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
//回滚事务消息
OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
//打开事务消息
boolean open();
//关闭事务消息
void close();
}
transactionalMessageCheckListener
事务消息回查 *
transactionalMessageCheckService
事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。
处理事务消息
当初始化完成之后,Broker就可以处理事务消息了。
Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor
,这和普通消息其实是一样的。
但是有两点针对事务消息的特殊处理:
第一处:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
中:
//获取扩展字段的值,若是该值为true则为事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) {
//判断当前Broker配置是否支持事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
//保存Half信息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
第二处:
存储事务消息前的预处理,对应方法是
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//将原消息的topic保存在扩展字段中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
//将原消息的QueueId保存在扩展字段中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
//将原消息的SysFlag保存在扩展字段中
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//修改Queueid为0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
完成上述步骤之后,调用DefaultMessageStole.putMessage()
方法将其保存到CommitLog
中。
CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
方法对其进行处理。
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the consume queue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费。
来源:https://juejin.cn/post/7132133887043436581
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- 声明式事务很方便,尤其纯注解模式,仅仅几个注解就能控制事务了思考:这些注解都做了什么?好神奇!@EnableTransactionManag
- Spring Boot可以和大部分流行的测试框架协同工作:通过Spring JUnit创建单元测试;生成测试数据初始化数据库用于测试;Spr
- 1、将 Jmeter 下 extras 目录中 ant-jmeter-1.1.1.jar 包拷贝至 ant 安装目录下的lib目录中,否则会
- 缘起工作时使用java开发服务器后台,用Jersey写Restful接口,发现有一个Post方法始终获取不到参数,查了半天,发现时获取参数的
- @RequestBody配合@Valid校验入参参数自定义一个Controllerimport com.example.demo.pojo.
- 本文实例为大家分享了SpringBoot实现分页功能的具体代码,供大家参考,具体内容如下新建demo\src\main\java\com\e
- Java自定义注解一般使用场景为:自定义注解+ * 或者AOP,使用自定义注解来自己设计框架,使得代码看起来非常优雅。本文将先从自定义注解的
- 一、背景在开发过程中,我们的软件会面对不同的运行环境,比如开发环境、测试环境、生产环境,而我们的软件在不同的环境中,有的配置可能会不一样,比
- 1.首先,八种基本数据类型分别是:int、short、float、double、long、boolean、byte、char; &
- 实现Java多态性的时候,关于方法调用的优先级:我们这样假设下,super(超类)、this(当前类对象)、show(方法)、object(
- java模拟实现图书检索系统 (基础版),供大家参考,具体内容如下练习实现3个简单的功能,没有优化,可以根据需求,自行添加想要实现的功能。B
- [LeetCode] 205. Isomorphic Strings 同构字符串Given two strings s
- 上篇教程回顾ServerSocket --监听客户端的连接,他的作用主要是建立一个连接-ServerSocket -建立连接,拿到一个Soc
- OSS不支持通过一个网络地址来上传图片,所以若想将网络上的图片上传到OSS上需要走点弯路。1、通过链接将图片下载到本地的一个文件夹下面2、用
- 前言:什么是多数据源?最常见的单一应用中最多涉及到一个数据库,即是一个数据源(Datasource)。那么顾名思义,多数据源就是在一个单一应
- @Param注解导致分页失效—分页 * 问题描述在使用mybatis分页时,使用@Param注解传入了两个对象,分页失效,查询出的总是全部的
- 需求在配置类中,从application.properties中读取一个复杂list。如List<Person>或者初始化一个m
- 相信在做B/S模式的项目时,我们请求server端时通常遇到返回数据的处理,对返回数据的格式处理方式多样,随着JSON的流行,现在很多的项目
- 本文实例讲述了Java矩阵连乘问题(动态规划)算法。分享给大家供大家参考,具体如下:问题描述:给定n个矩阵:A1,A2,...,An,其中A
- 抽象方法与虚方法的区别先说两者最大的区别:抽象方法是需要子类去实现的。虚方法是已经实现了的,可以被子类覆盖,也可以不覆盖,取决于需求。因为抽