软件编程
位置:首页>> 软件编程>> java编程>> RocketMQ源码解析broker 启动流程

RocketMQ源码解析broker 启动流程

作者:hsfxuebao  发布时间:2022-12-25 10:50:54 

标签:RocketMQ,broker,启动

1. 启动入口

本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

前面我们已经分析完了NameServerproducer,从本文开始,我们将分析Broker

broker的启动类为org.apache.rocketmq.broker.BrokerStartup,代码如下:

public class BrokerStartup {
   ...
   public static void main(String[] args) {
       start(createBrokerController(args));
   }
   ...
}

main()方法中,仅有一行代码,这行代码包含了两个操作:

  • createBrokerController(...):创建BrokerController

  • start(...):启动Broker

接下来我们就来分析这两个操作。

2. 创建BrokerController

创建BrokerController的方法为BrokerStartup#createBrokerController,代码如下:

/**
* 创建 broker 的配置参数
*/
public static BrokerController createBrokerController(String[] args) {
   ...
   try {
       //解析命令行参数
       Options options = ServerUtil.buildCommandlineOptions(new Options());
       commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
           new PosixParser());
       if (null == commandLine) {
           System.exit(-1);
       }
       // 处理配置
       final BrokerConfig brokerConfig = new BrokerConfig();
       final NettyServerConfig nettyServerConfig = new NettyServerConfig();
       final NettyClientConfig nettyClientConfig = new NettyClientConfig();
       // tls安全相关
       nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
           String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
       // 配置端口
       nettyServerConfig.setListenPort(10911);
       // 消息存储的配置
       final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
       ...
       // 将命令行中的配置设置到brokerConfig对象中
       MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
       // 检查环境变量:ROCKETMQ_HOME
       if (null == brokerConfig.getRocketmqHome()) {
           System.out.printf("Please set the %s variable in your environment to match
               the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
           System.exit(-2);
       }
       //省略一些配置
       ...
       // 创建 brokerController
       final BrokerController controller = new BrokerController(
           brokerConfig,
           nettyServerConfig,
           nettyClientConfig,
           messageStoreConfig);
       controller.getConfiguration().registerConfig(properties);
       // 初始化
       boolean initResult = controller.initialize();
       if (!initResult) {
           controller.shutdown();
           System.exit(-3);
       }
       // 关闭钩子,在关闭前处理一些操作
       Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
           private volatile boolean hasShutdown = false;
           private AtomicInteger shutdownTimes = new AtomicInteger(0);
           @Override
           public void run() {
               synchronized (this) {
                   if (!this.hasShutdown) {
                       ...
                       // 这里会发一条注销消息给nameServer
                       controller.shutdown();
                       ...
                   }
               }
           }
       }, "ShutdownHook"));
       return controller;
   } catch (Throwable e) {
       e.printStackTrace();
       System.exit(-1);
   }
   return null;
}

这个方法的代码有点长,但功能并不多,总的来说就三个功能:

  • 处理配置:主要是处理nettyServerConfignettyClientConfig的配置,这块就是一些配置解析的操作,处理方式与NameServer很类似,这里就不多说了。

  • 创建及初始化controller:调用方法controller.initialize(),这块内容我们后面分析。

  • 注册关闭钩子:调用Runtime.getRuntime().addShutdownHook(...),可以在jvm进程关闭前进行一些操作。

2.1 controller实例化

BrokerController的创建及初始化是在BrokerStartup#createBrokerController方法中进行,我们先来看看它的构造方法:

public BrokerController(
   final BrokerConfig brokerConfig,
   final NettyServerConfig nettyServerConfig,
   final NettyClientConfig nettyClientConfig,
   final MessageStoreConfig messageStoreConfig
) {
   // 4个核心配置信息
   this.brokerConfig = brokerConfig;
   this.nettyServerConfig = nettyServerConfig;
   this.nettyClientConfig = nettyClientConfig;
   this.messageStoreConfig = messageStoreConfig;
   // 管理consumer消费消息的offset
   this.consumerOffsetManager = new ConsumerOffsetManager(this);
   // 管理topic配置
   this.topicConfigManager = new TopicConfigManager(this);
   // 处理 consumer 拉消息请求的
   this.pullMessageProcessor = new PullMessageProcessor(this);
   this.pullRequestHoldService = new PullRequestHoldService(this);
   // 消息送达的 *
   this.messageArrivingListener
       = new NotifyMessageArrivingListener(this.pullRequestHoldService);
   ...
   // 往外发消息的组件
   this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
   ...
}

BrokerController的构造方法很长,基本都是一些赋值操作,代码中已列出关键项,这些包括:

  • 核心配置赋值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四个配置

  • ConsumerOffsetManager:管理consumer消费消息位置的偏移量,偏移量表示消费者组消费该topic消息的位置,后面再消费时,就从该位置后消费,避免重复消费消息,也避免了漏消费消息。

  • topicConfigManagertopic配置管理器,就是用来管理topic配置的,如topic名称,topic队列数量

  • pullMessageProcessor:消息处理器,用来处理消费者拉消息

  • messageArrivingListener:消息送达的 * ,当生产者的消息送达时,由该 * 监听

  • brokerOuterAPI:往外发消息的组件,如向NameServer发送注册/注销消息

以上这些组件的用处,这里先混个脸熟,我们后面再分析。

2.2 初始化controller

我们再来看看初始化操作,方法为BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {
   // 加载配置文件中的配置
   boolean result = this.topicConfigManager.load();
   result = result && this.consumerOffsetManager.load();
   result = result && this.subscriptionGroupManager.load();
   result = result && this.consumerFilterManager.load();
   if (result) {
       try {
           // 消息存储管理组件,管理磁盘上的消息
           this.messageStore =
               new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
                   this.messageArrivingListener, this.brokerConfig);
           // 启用了DLeger,就创建DLeger相关组件
           if (messageStoreConfig.isEnableDLegerCommitLog()) {
               ...
           }
           // broker统计组件
           this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
           //load plugin
           MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,
               brokerStatsManager, messageArrivingListener, brokerConfig);
           this.messageStore = MessageStoreFactory.build(context, this.messageStore);
           this.messageStore.getDispatcherList().addFirst(
               new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
       } catch (IOException e) {
           result = false;
           log.error("Failed to initialize", e);
       }
   }
   // 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息
   result = result && this.messageStore.load();
   if (result) {
       // 处理 nettyServer
       this.remotingServer = new NettyRemotingServer(
           this.nettyServerConfig, this.clientHousekeepingService);
       NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
       fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
       this.fastRemotingServer = new NettyRemotingServer(
           fastConfig, this.clientHousekeepingService);
       // 创建线程池start... 这里会创建多种类型的线程池
       ...
       // 处理consumer pull操作的线程池
       this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
           this.brokerConfig.getPullMessageThreadPoolNums(),
           this.brokerConfig.getPullMessageThreadPoolNums(),
           1000 * 60,
           TimeUnit.MILLISECONDS,
           this.pullThreadPoolQueue,
           new ThreadFactoryImpl("PullMessageThread_"));
       ...
       // 创建线程池end...
       // 注册处理器
       this.registerProcessor();
       // 启动定时任务start... 这里会启动好多的定时任务
       ...
       // 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上
       this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
               try {
                   BrokerController.this.consumerOffsetManager.persist();
               } catch (Throwable e) {
                   log.error("schedule persist consumerOffset error.", e);
               }
           }
       }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
       ...
       // 启动定时任务end...
       ...
       // 开启 DLeger 的一些操作
       if (!messageStoreConfig.isEnableDLegerCommitLog()) {
           ...
       }
       // 处理tls配置
       if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
           ...
       }
       // 初始化一些操作
       initialTransaction();
       initialAcl();
       initialRpcHooks();
   }
   return result;
}

这个还是很长,关键部分都做了注释,该方法所做的工作如下:

  • 加载配置文件中的配置

  • 赋值与初始化操作

  • 创建线程池

  • 注册处理器

  • 启动定时任务

这里我们来看 * 册处理器的操作this.registerProcessor():

2.2.1 注册处理器:BrokerController#registerProcessor

this.registerProcessor()实际调用的方法是BrokerController#registerProcessor,代码如下:

public void registerProcessor() {
   /**
    * SendMessageProcessor
    */
   SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
   sendProcessor.registerSendMessageHook(sendMessageHookList);
   sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
   this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,
       this.sendMessageExecutor);
   this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
       this.sendMessageExecutor);
   this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,
       this.sendMessageExecutor);
   this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
       this.sendMessageExecutor);
   ...
   /**
    * PullMessageProcessor
    */
   this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
       this.pullMessageExecutor);
   this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
   /**
       * ReplyMessageProcessor
       */
   ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
   replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
   ...
}

这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理producer/consumer的消息时,就会用到这些处理器,这里先不展开分析。

2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor

我们来看下remotingServer注册处理器的操作,方法为NettyRemotingServer#registerProcessor

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
   ...
   @Override
   public void registerProcessor(int requestCode, NettyRequestProcessor processor,
           ExecutorService executor) {
       ExecutorService executorThis = executor;
       if (null == executor) {
           executorThis = this.publicExecutor;
       }
       Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor,
               ExecutorService>(processor, executorThis);
       // 注册到processorTable 中
       this.processorTable.put(requestCode, pair);
   }
   ...
}

最终,这些处理器注册到了processorTable中,它是NettyRemotingAbstract的成员变量,定义如下:

HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

这是一个hashMap的结构,keycodevaluePair,该类中有两个成员变量:NettyRequestProcessorExecutorServicecodeNettyRequestProcessor的映射关系就是在hashMap里存储的。

2.3 注册关闭钩子:Runtime.getRuntime().addShutdownHook(...)

接着我们来看看注册关闭钩子的操作:

// 关闭钩子,在关闭前处理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
   private volatile boolean hasShutdown = false;
   private AtomicInteger shutdownTimes = new AtomicInteger(0);
   @Override
   public void run() {
       synchronized (this) {
           if (!this.hasShutdown) {
               ...
               // 这里会发一条注销消息给nameServer
               controller.shutdown();
               ...
           }
       }
   }
}, "ShutdownHook"));

跟进BrokerController#shutdown方法:

public void shutdown() {
   // 调用各组件的shutdown方法
   ...
   // 发送注销消息到NameServer
   this.unregisterBrokerAll();
   ...
   // 持久化consumer的消费偏移量
   this.consumerOffsetManager.persist();
   // 又是调用各组件的shutdown方法
   ...

这个方法里会调用各组件的shutdown()方法、发送注销消息给NameServer、持久化consumer的消费偏移量,这里我们主要看发送注销消息的方法BrokerController#unregisterBrokerAll:

private void unregisterBrokerAll() {
   // 发送一条注销消息给nameServer
   this.brokerOuterAPI.unregisterBrokerAll(
       this.brokerConfig.getBrokerClusterName(),
       this.getBrokerAddr(),
       this.brokerConfig.getBrokerName(),
       this.brokerConfig.getBrokerId());
}

继续进入BrokerOuterAPI#unregisterBrokerAll

public void unregisterBrokerAll(
   final String clusterName,
   final String brokerAddr,
   final String brokerName,
   final long brokerId
) {
   // 获取所有的 nameServer,遍历发送注销消息
   List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
   if (nameServerAddressList != null) {
       for (String namesrvAddr : nameServerAddressList) {
           try {
               this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
               log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
           } catch (Exception e) {
               log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
           }
       }
   }
}

这个方法里,会获取到所有的nameServer,然后逐个发送注销消息,继续进入BrokerOuterAPI#unregisterBroker方法:

public void unregisterBroker(
   final String namesrvAddr,
   final String clusterName,
   final String brokerAddr,
   final String brokerName,
   final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
       InterruptedException, MQBrokerException {
   UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
   requestHeader.setBrokerAddr(brokerAddr);
   requestHeader.setBrokerId(brokerId);
   requestHeader.setBrokerName(brokerName);
   requestHeader.setClusterName(clusterName);
   // 发送的注销消息:RequestCode.UNREGISTER_BROKER
   RemotingCommand request = RemotingCommand.createRequestCommand(
           c, requestHeader);
   RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
   assert response != null;
   switch (response.getCode()) {
       case ResponseCode.SUCCESS: {
           return;
       }
       default:
           break;
   }
   throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}

最终调用的是RemotingClient#invokeSync进行消息发送,请求codeRequestCode.UNREGISTER_BROKER,这就与NameServer接收broker的注销消息对应上了。

3. 启动Broker:start(...)

我们再来看看Broker的启动流程,处理方法为BrokerController#start

public void start() throws Exception {
   // 启动各组件
   // 启动消息存储相关组件
   if (this.messageStore != null) {
       this.messageStore.start();
   }
   // 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息
   if (this.remotingServer != null) {
       this.remotingServer.start();
   }
   ...
   // broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务
   if (this.brokerOuterAPI != null) {
       this.brokerOuterAPI.start();
   }
   ...
   // broker 核心的心跳注册任务
   this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
       @Override
       public void run() {
           try {
               BrokerController.this.registerBrokerAll(true, false,
                   brokerConfig.isForceRegister());
           } catch (Throwable e) {
               log.error("registerBrokerAll Exception", e);
           }
       }
       // brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次
   }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
           TimeUnit.MILLISECONDS);
   ...
}

这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:

  • messageStore:消息存储组件,在这个组件里,会启动消息存储相关的线程,如消息的投递操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等

  • remotingServernetty服务,用来接收请求消息,如producer发送过来的消息

  • brokerOuterAPI:也是一个netty服务,用来对外发送消息,如向nameServer上报心跳消息

  • 启动定时任务:brokernameServer发送注册消息

这里我们重点来看定时任务是如何发送心跳发送的。

处理注册消息发送的时间间隔如下:

Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.

处理消息注册的方法为BrokerController#registerBrokerAll(...),代码如下:

public synchronized void registerBrokerAll(final boolean checkOrderConfig,
       boolean oneway, boolean forceRegister) {
   TopicConfigSerializeWrapper topicConfigWrapper
           = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
   // 处理topic相关配置
   if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
       || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
       ...
   }
   // 这里会判断是否需要进行注册
   if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
       this.getBrokerAddr(),
       this.brokerConfig.getBrokerName(),
       this.brokerConfig.getBrokerId(),
       this.brokerConfig.getRegisterBrokerTimeoutMills())) {
       // 进行注册操作    
       doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
   }
}

这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为BrokerController#needRegister, 代码如下:

private boolean needRegister(final String clusterName,
   final String brokerAddr,
   final String brokerName,
   final long brokerId,
   final int timeoutMills) {
   TopicConfigSerializeWrapper topicConfigWrapper
       = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
   // 判断是否需要进行注册
   List&lt;Boolean&gt; changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName,
       brokerId, topicConfigWrapper, timeoutMills);
   // 有一个发生了变化,就表示需要注册了    
   boolean needRegister = false;
   for (Boolean changed : changeList) {
       if (changed) {
           needRegister = true;
           break;
       }
   }
   return needRegister;
}

这个方法调用了brokerOuterAPI.needRegister(...)来判断broker是否发生了变化,只要一个NameServer上发生了变化,就说明需要进行注册操作。

brokerOuterAPI.needRegister(...)是如何判断broker是否发生了变化的呢?继续跟进BrokerOuterAPI#needRegister

public List<Boolean> needRegister(
   final String clusterName,
   final String brokerAddr,
   final String brokerName,
   final long brokerId,
   final TopicConfigSerializeWrapper topicConfigWrapper,
   final int timeoutMills) {
   final List<Boolean> changedList = new CopyOnWriteArrayList<>();
   // 获取所有的 nameServer
   List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
   if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
       final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
       // 遍历所有的nameServer,逐一发送请求
       for (final String namesrvAddr : nameServerAddressList) {
           brokerOuterExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       QueryDataVersionRequestHeader requestHeader
                           = new QueryDataVersionRequestHeader();
                       ...
                       // 向nameServer发送消息,命令是 RequestCode.QUERY_DATA_VERSION
                       RemotingCommand request = RemotingCommand
                           .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                       // 把当前的 DataVersion 发到 nameServer    
                       request.setBody(topicConfigWrapper.getDataVersion().encode());
                       // 发请求到nameServer
                       RemotingCommand response = remotingClient
                           .invokeSync(namesrvAddr, request, timeoutMills);
                       DataVersion nameServerDataVersion = null;
                       Boolean changed = false;
                       switch (response.getCode()) {
                           case ResponseCode.SUCCESS: {
                               QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                 (QueryDataVersionResponseHeader) response
                                 .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                               changed = queryDataVersionResponseHeader.getChanged();
                               byte[] body = response.getBody();
                               if (body != null) {
                                   // 拿到 DataVersion
                                   nameServerDataVersion = DataVersion.decode(body, D
                                       ataVersion.class);
                                   // 这里是判断的关键
                                   if (!topicConfigWrapper.getDataVersion()
                                       .equals(nameServerDataVersion)) {
                                       changed = true;
                                   }
                               }
                               if (changed == null || changed) {
                                   changedList.add(Boolean.TRUE);
                               }
                           }
                           default:
                               break;
                       }
                       ...
                   } catch (Exception e) {
                       ...
                   } finally {
                       countDownLatch.countDown();
                   }
               }
           });
       }
       try {
           countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
           log.error("query dataversion from nameserver countDownLatch await Exception", e);
       }
   }
   return changedList;
}

这个方法里,先是遍历所有的nameServer,向每个nameServer都发送一条codeRequestCode.QUERY_DATA_VERSION的参数,参数为当前brokerDataVersion,当nameServer收到消息后,就返回nameServer中保存的、与当前broker对应的DataVersion,当两者版本不相等时,就表明当前broker发生了变化,需要重新注册。

DataVersion是个啥呢?它的部分代码如下:

public class DataVersion extends RemotingSerializable {
   // 时间戳
   private long timestamp = System.currentTimeMillis();
   // 计数器,可以理解为最近的版本号
   private AtomicLong counter = new AtomicLong(0);
   public void nextVersion() {
       this.timestamp = System.currentTimeMillis();
       this.counter.incrementAndGet();
   }
   /**
    * equals 方法,当 timestamp 与 counter 都相等时,则两者相等
    */
   @Override
   public boolean equals(final Object o) {
       if (this == o)
           return true;
       if (o == null || getClass() != o.getClass())
           return false;
       final DataVersion that = (DataVersion) o;
       if (timestamp != that.timestamp) {
           return false;
       }
       if (counter != null && that.counter != null) {
           return counter.longValue() == that.counter.longValue();
       }
       return (null == counter) && (null == that.counter);
   }
   ...
}

DataVersionequals()方法来看,只有当timestampcounter都相等时,两个DataVersion对象才相等。那这两个值会在哪里被修改呢?从DataVersion#nextVersion方法的调用情况来看,引起这两个值的变化主要有两种:

  • broker 上新创建了一个 topic

  • topic的发了的变化

在这两种情况下,DataVersion#nextVersion方法被调用,从而引起DataVersion的改变。DataVersion改变了,就表明当前broker需要向nameServer注册了。

让我们再回到BrokerController#registerBrokerAll(...)方法:

public synchronized void registerBrokerAll(final boolean checkOrderConfig,
       boolean oneway, boolean forceRegister) {
   ...
   // 这里会判断是否需要进行注册
   if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
       this.getBrokerAddr(),
       this.brokerConfig.getBrokerName(),
       this.brokerConfig.getBrokerId(),
       this.brokerConfig.getRegisterBrokerTimeoutMills())) {
       // 进行注册操作    
       doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
   }
}

处理注册的方法为BrokerController#doRegisterBrokerAll,稍微看下它的流程:

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
       TopicConfigSerializeWrapper topicConfigWrapper) {
   // 注册
   List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
       this.brokerConfig.getBrokerClusterName(),
       this.getBrokerAddr(),
       this.brokerConfig.getBrokerName(),
       this.brokerConfig.getBrokerId(),
       this.getHAServerAddr(),
       // 这个对象里就包含了当前broker的版本信息
       topicConfigWrapper,
       this.filterServerManager.buildNewFilterServerList(),
       oneway,
       this.brokerConfig.getRegisterBrokerTimeoutMills(),
       this.brokerConfig.isCompressedRegister());
   ...
}

继续跟下去,最终调用的是BrokerOuterAPI#registerBroker方法:

private RegisterBrokerResult registerBroker(
   final String namesrvAddr,
   final boolean oneway,
   final int timeoutMills,
   final RegisterBrokerRequestHeader requestHeader,
   final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException,
   RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
   // 构建请求
   RemotingCommand request = RemotingCommand
       .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
   request.setBody(body);
   // 处理发送操作:sendOneWay
   if (oneway) {
       try {
           // 注册操作
           this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
       } catch (RemotingTooMuchRequestException e) {
           // Ignore
       }
       return null;
       ...
   }
   ....
}

所以,所谓的注册操作,就是当nameServer发送一条codeRequestCode.REGISTER_BROKER的消息,消息里会带上当前brokertopic信息、版本号等。

4.总结

本文主要分析了broker的启动流程,总的来说,启动流程分为3个:

  • 解析配置文件,这一步会解析各种配置,并将其赋值到对应的对象中

  • BrokerController创建及初始化:创建了BrokerController对象,并进行初始化操作,所谓的初始化,就是加载配置文件中配置、创建线程池、注册请求处理器、启动定时任务等

  • BrokerController启动:这一步是启动broker的核心组件,如messageStore(消息存储)、remotingServer(netty服务,用来处理producerconsumer请求)、brokerOuterAPI(netty服务,用来向nameServer上报当前broker信息)等。

在分析启动过程中,重点分析了两类消息的发送:

  • ShutdownHook中,broker会向nameServer发送注销消息,这表明在broker关闭前,nameServer会清除当前broker的注册信息

  • broker启动后,会启动一个定时任务,定期判断是否需要向nameServer注册,判断是否需要注册时,会向nameServer发送codeQUERY_DATA_VERSION的消息,从nameServer得到当前broker的版本号,该版本号与本地版本号不一致,就表示需要向broker重新注册了,即发送注册消息。

参考文章

RocketMQ4.8注释github地址

来源:https://juejin.cn/post/7213307533280395324

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com