软件编程
位置:首页>> 软件编程>> java编程>> RocketMQ producer发送者浅析

RocketMQ producer发送者浅析

作者:Acqierement  发布时间:2023-04-03 06:35:32 

标签:RocketMQ,producer,发送者

发送者其实比较简单,需要做的就是首先确定往哪里发送,其次怎么让消息发送顺畅。我们就看一下具体的代码吧。

首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务是updateTopicRouteInfoFromNameServer,这个方法里面和nameService建立长连接,同时维护了topicRouteTable和brokerAddrTable等缓存。topicRouteTable里面维护了这个topic包括有哪些queue和broker。这样producer才可以知道要发往哪里。

启动的流程主要在这个方法中:

MQClientInstance#start

public void start() throws MQClientException {
   synchronized (this) {
       switch (this.serviceState) {
           case CREATE_JUST:
               this.serviceState = ServiceState.START_FAILED;
               // If not specified,looking address from name server
               if (null == this.clientConfig.getNamesrvAddr()) {
                   this.mQClientAPIImpl.fetchNameServerAddr();
               }
               // Start request-response channel
               this.mQClientAPIImpl.start();
               // Start various schedule tasks
               this.startScheduledTask();
               // Start pull service
               this.pullMessageService.start();
               // Start rebalance service
               this.rebalanceService.start();
               // Start push service
               this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
               log.info("the client factory [{}] start OK", this.clientId);
               this.serviceState = ServiceState.RUNNING;
               break;
           case START_FAILED:
               throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
           default:
               break;
       }
   }
}

其中启动了一系列定时任务,包括org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer这个方法

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
       DefaultMQProducer defaultMQProducer) {
       try {
           if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
               try {
                   TopicRouteData topicRouteData;
                   if (isDefault && defaultMQProducer != null) {
                       // 从nameServer获取topciRouteData
                       topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                           clientConfig.getMqClientApiTimeout());
                       if (topicRouteData != null) {
                           for (QueueData data : topicRouteData.getQueueDatas()) {
                               int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                               data.setReadQueueNums(queueNums);
                               data.setWriteQueueNums(queueNums);
                           }
                       }
                   } else {
                       topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                   }
                   if (topicRouteData != null) {
                       TopicRouteData old = this.topicRouteTable.get(topic);
                       boolean changed = topicRouteData.topicRouteDataChanged(old);
                       if (!changed) {
                           changed = this.isNeedUpdateTopicRouteInfo(topic);
                       } else {
                           log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                       }
                       if (changed) {
                           for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                               this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                           }
                           // Update endpoint map
                           {
                               ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                               if (!mqEndPoints.isEmpty()) {
                                   topicEndPointsTable.put(topic, mqEndPoints);
                               }
                           }
                           // Update Pub info
                           {
                               // 生成topicPublishInfo
                               TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                               publishInfo.setHaveTopicRouterInfo(true);
                               for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                                   MQProducerInner impl = entry.getValue();
                                   if (impl != null) {
                                       // 更新 topicPublishInfo
                                       impl.updateTopicPublishInfo(topic, publishInfo);
                                   }
                               }
                           }
                           // Update sub info
                           if (!consumerTable.isEmpty()) {
                               Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                               for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                                   MQConsumerInner impl = entry.getValue();
                                   if (impl != null) {
                                       impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                   }
                               }
                           }
                           TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
                           log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                           this.topicRouteTable.put(topic, cloneTopicRouteData);
                           return true;
                       }
                   } else {
                       log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                   }
               } catch (MQClientException e) {
                   if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                       log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                   }
               } catch (RemotingException e) {
                   log.error("updateTopicRouteInfoFromNameServer Exception", e);
                   throw new IllegalStateException(e);
               } finally {
                   this.lockNamesrv.unlock();
               }
           } else {
               log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
           }
       } catch (InterruptedException e) {
           log.warn("updateTopicRouteInfoFromNameServer Exception", e);
       }
       return false;
   }

通过方法名也知道是从nameServer获取这个topic相关的broke数据,拿到TopicRouteData数据。先更新brokerAddrTable,存储borker具体的地址。然后在org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo里面再进一步生成TopicPublishInfo数据。TopicPublishInfo是对TopicRouteData的一个封装,除了TopicRouteData,还有messageQueue数据,messageQueue是Queue和Borker的交集,会根据配置的queue数量,生成具体的messageQueue,queueId就是0,1,2,3,4他们自己的顺序。

所以有了TopicPublishInfo数据,就知道往哪里发了。

发送消息的过程。

  • 先找到TopicPublishInfo。TopicPublishInfo里面有一个MessageQueue的list。

  • 从MessageQueueList里面拿到一个messageQueue。 如果没有开启sendLatencyFaultEnable,默认就是采用轮询方法。具体的轮询方式就是,TopicPublishInfo里面维护了一个序号index,每次index自增1,然后通过index去MessageQueueList里面拿一个。

  • 拿到了MessageQueue之后,里面有broker的name,根据name去找broker的ip地址,发送数据。这个ip地址就是前面提到的brokerAddrTable变量,在updateTopicRouteInfoFromNameServer方法里面维护的。

来源:https://blog.csdn.net/weixin_43094917/article/details/129934869

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com