RocketMQ源码解析topic创建机制详解
作者:蚂蚁背大象 发布时间:2023-03-16 01:06:23
1. RocketMQ Topic创建机制
以下源码基于Rocket MQ 4.7.0
RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建
autoCreateTopicEnable=true/false
下面会结合源码来深度分析一下自动创建和手动创建的过程。
2. 自动Topic
默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中
// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
自动创建开关是下BrokerConfig类中有一个私有变量:
@ImportantField
private boolean autoCreateTopicEnable = true;
这变量可以通过配置文件配置来进行修改,代码中的默认值为true,所以在默认的情况下Rocket MQ是会自动创建Topic的。
在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ系统内置的一些系统Topic(这里只关注一下TBW102):
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums()); //8
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums()); //8
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 这样一段代码,在开启允许自动创建的时候,会把当前Topic的信息存入topicConfigTable变量中。
然后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保存。在BrokerController中定义了这样的一个定时任务来执行这个心跳包的发送:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
这里就说明了如何把每个Broker的系统自定义的Topic注册到NameServer。
接下来看在发送过程中如何从NameServer获取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略代码
//获取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
}
通过DefaultMQProducerImpl.tryToFindTopicPublishInfo方法获取Topic的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//第一次从缓存中获取--肯定没有因为还没创建
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//从NameServer获取--也是没有,因为没有创建
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//第二次从这里获取
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
下面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
//省略代码
if (isDefault && defaultMQProducer != null) {
//使用默认的TBW102 Topic获取数据
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//这是正常的
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
//省略代码
}
如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。
这里会比较一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默认值和TBW102中的值哪个更小。
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
}
判断获取默认的是否存在,如果存在把当前的Topic的信息更新。
也就是把TBW102 Topic的数据更新为自动创建的数据。
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
更新本地的缓存。这样TBW102 Topic的负载和一些默认的路由信息就会被自己创建的Topic使用。这里就是整个自动创建的过程.
总结一下就是:通过使用系统内部的一个TBW102的Topic的配置来自动创建当前用户的要创建的自定义Topic。
3. 手动创建--预先创建
手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面创建Topic。
通过界面控制台创建
项目地址: github.com/apache/rock…
TopicController主要负责Topic的管理
@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
"clusterName or brokerName can not be all blank");
logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
topicService.createOrUpdate(topicCreateOrUpdateRequest);
return true;
}
然后通过MQAdminExtImpl.createAndUpdateTopicConfig方法来创建:
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
}
通过调用DefaultMQAdminExtImpl.createAndUpdateTopicConfig创建Topic
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
最后通过MQClientAPIImpl.createTopic创建Topic
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
来源:https://juejin.cn/post/7133413162048569374


猜你喜欢
- 1.限定字符串用 @ 符号加在字符串前面表示其中的转义字符“不”被处理。 如果我们写一个文件的路径,例如"D:/文本文件"
- @JsonInclude(JsonInclude.Include.NON_NULL)不起作用记录一下使用@JsonInclude(JsonI
- 本文实例讲述了Android中SeekBar和RatingBar用法。分享给大家供大家参考,具体如下:什么是SeekBar?可以拖动的进度条
- 混乱的URI编码 JavaScript中编码有三种方法:escape、encodeURI、encodeURIComponent C#中编码主
- 当两个线程竞争同一资源时,如果对资源的访问顺序敏感,就称存在竞态条件。导致竞态条件发生的代码区称作临界区。在临界区中使用适当的同步就可以避免
- 思路:先从集合中找出来顶级的菜单,然后遍历顶级菜单,找出每个顶级菜单的所有子菜单,然后判断当前需要排列的集合是否为空,如果不为空的话,就在遍
- 本文实例分析了C#实现的24点游戏。分享给大家供大家参考。具体如下:1. 24点游戏规则及算法规则:给出4个自然数,找出能够求出24的四则运
- 简介mutable(可变)和immutable(不可变)对象是我们在java程序编写的过程中经常会使用到的。可变类型对象就是说,对象在创建之
- 本文实例为大家分享了java获取不同路径的方法,供大家参考,具体内容如下思路:自定义Button获取DialogManager、AudioM
- 一、多线程的优缺点多线程的优点:1)资源利用率更好2)程序设计在某些情况下更简单3)程序响应更快多线程的代价:1)设计更复杂虽然有一些多线程
- WebView设置WebViewClient的方法shouldOverrideUrlLoading:在web页面里单击链接的时候,会自动调用
- C# 中可以将类、结构或接口的定义拆分到两个或多个源文件中,在类声明前添加partial关键字即可。1. 什么是局部类型?C# 2.0 引入
- 本文实例为大家分享了Android实现录制按钮的具体代码,供大家参考,具体内容如下初始化布局文件中参数private void initPa
- mapper.xml文件<?xml version="1.0" encoding="UTF-8"
- 以前编程的时候,遇到倒计时的功能时,经常自己去写,但其实Android已经帮封装好了一个倒计时类CountDownTimer,其实是将后台线
- 一、JAVA简要概述先说一下java之父,詹姆斯·高斯林这是一个爱喝咖啡而又强大的男人。再来看一下JAVA有多火在TIOBE排行榜上JAVA
- 本文研究的主要是Java虚拟机中gc日志的理解问题,具体如下。一、日志分析 理解GC日志是处理Java虚拟机内存问题的基本技能。通过在jav
- Java动态数组Arraylist存放自定义数据类型class Point{ int x; int y; public Point(int
- 一、总体概述官方文档:https://docs.devexpress.com/WindowsForms/8117/controls-and-
- 这篇讲解一下rocketMq的事务消息的原理在发送事务消息的时候,会加一个标识,表示这个消息是事务消息。broker接收到消息后,在我们之前