RocketMQ特性Broker存储事务消息实现
作者:奔跑的毛球 发布时间:2022-07-10 20:42:13
引言
在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()
方法执行的。
private void initialTransaction() {
this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
if (null == this.transactionalMessageService) {
this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
}
this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
if (null == this.transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
}
this.transactionalMessageCheckListener.setBrokerController(this);
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
这里有三个核心的初始化变量
TransactionalMessageService
事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl
也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()
方法进行加载。
TransactionalMessageService
类定义如下。内部属性已加注释标明。
public interface TransactionalMessageService {
//用于保存Half事务消息
PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
//删除事务消息
boolean deletePrepareMessage(MessageExt messageExt);
//提交事务消息
OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
//回滚事务消息
OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
//打开事务消息
boolean open();
//关闭事务消息
void close();
}
transactionalMessageCheckListener
事务消息回查 *
transactionalMessageCheckService
事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。
处理事务消息
当初始化完成之后,Broker就可以处理事务消息了。
Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor
,这和普通消息其实是一样的。
但是有两点针对事务消息的特殊处理:
第一处:
在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
中:
//获取扩展字段的值,若是该值为true则为事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) {
//判断当前Broker配置是否支持事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
//保存Half信息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
第二处:
存储事务消息前的预处理,对应方法是
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//将原消息的topic保存在扩展字段中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
//将原消息的QueueId保存在扩展字段中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
//将原消息的SysFlag保存在扩展字段中
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//修改topic的值为RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//修改Queueid为0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
完成上述步骤之后,调用DefaultMessageStole.putMessage()
方法将其保存到CommitLog
中。
CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
方法对其进行处理。
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the consume queue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费。
来源:https://juejin.cn/post/7132133887043436581


猜你喜欢
- 本文列举了我在周围同事的Java代码中看到的一些比较典型的错误。显然,静态代码分析(我们团队用的是qulice)不可能发现所有的问题,这也是
- ***Source URL: http://i.yesky.com/bbs/jsp/view.jsp?articleID=889992&am
- webflux过滤器(RouterFunction实现)相关类与接口HandlerFiterFunction@FunctionalInter
- 代码入下:import java.io.*; public class Practice { publ
- 这篇效果和上一篇:https://www.jb51.net/article/100638.htm的效果是一样的,但是不再在OnTouchEv
- 简介目的:Optional的出现主要是为了解决null指针问题,也叫NPE(NullPointerException)外形:Optional
- 今天给大家介绍一下一个web 中经常会用到的截图(如:头像等)工具:Jcrop演示项目结构:效果图:这个很有用:看到这些,大家也想自己试试吧
- 实例如下:private bool creatExcel(string xlsfile) { &nb
- Spring boot + Spring data jpa + Thymeleaf批量插入 + POI读取 + 文件上传pom.xml:&l
- 上一篇我们学习了自定义ViewGroup的基本步骤,并做了一个CustomGridLayout的实例,这篇我们继续来说说自定义ViewGro
- 1.try-catch异常处理说明Java提供try和catch块来处理异常,try块用于包含可能出错的代码。catch块用于处理try块中
- Service概念及用途:Android中的服务,它与Activity不同,它是不能与用户交互的,不能自己启动的,运行在后台的程序,如果我们
- 这个例子用于演示在Spring Boot应用中如何验证Web 应用的输入,我们将会建立一个简单的Spring MVC应用,来读取用户输入并使
- 本文实例讲述了C++判断pe文件的方法。分享给大家供大家参考。具体实现方法如下:#include <afxdlgs.h>是为了使
- 本文实例讲述了C#实现简单屏幕监控的方法。分享给大家供大家参考。具体如下:这是一段C#编写的屏幕监控代码,可以自动对屏幕进行截图,软件自身隐
- 自己整理了 spring boot 结合 Redis 的工具类引入依赖<dependency> <groupI
- 本文主要介绍了spring-boot-maven-plugin报红解决方案,亲测有效,具体如下:<?xml version="
- 动态方法就是一个Action对应多个请求,减少Action的数量1、指定method属性<action name="addA
- 在 C# 中反射技术应用广泛,至于什么是反射.........你如果不了解的话,请看下段说明,否则请跳过下段。广告一下:喜欢我文章的朋友请关
- app_main上一篇文章:# Android 10 启动分析之servicemanager篇 (二)在init篇中有提到,init进程会在