软件编程
位置:首页>> 软件编程>> java编程>> RocketMQ消息生产者是如何选择Broker示例详解

RocketMQ消息生产者是如何选择Broker示例详解

作者:梦想实现家_Z  发布时间:2023-11-10 21:45:49 

标签:RocketMQ,消息,生产者,Broker

前言

RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢?

从NameServer查询Topic信息

通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()这个方法,并且我们找到了最关键的Topic信息:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

这个方法就是通过topicNameServer拉出对应的Broker信息:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
       TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
       if (null == topicPublishInfo || !topicPublishInfo.ok()) {
           this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
       }
       if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
           return topicPublishInfo;
       } else {
           this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
           topicPublishInfo = this.topicPublishInfoTable.get(topic);
           return topicPublishInfo;
       }
   }

1.一开始的话,是从当前缓存中找Topic信息,第一次肯定是找不到的;

2.找不到Topic信息,那么就调用updateTopicRouteInfoFromNameServer(topic)NameServer拉对应的信息,如果拉到了就更新到缓存中;

3.如果依然找不到Topic信息,说明没有任何Broker上面是有这个Topic的;但是我们还要拉开启了自动创建Topic配置的Broker信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)实现;

生产者客户端会从两个地方获取Broker信息,第一个就是从内存缓存中获取,第二个就是从NameServer中获取。从NameServer中分两次获取,一次是获取存在的Topic对应的Broker信息,第二次是获取还没有创建出来的Topic对应的Broker信息;

如何选择Broker

当客户端拿到了Topic对应的Broker信息后,它是如何选择目标Broker的呢?继续向下看,我们找到了关键代码:

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
           int times = 0;
           String[] brokersSent = new String[timesTotal];
           for (; times < timesTotal; times++) {
               String lastBrokerName = null == mq ? null : mq.getBrokerName();
               MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
               if (mqSelected != null) {
                   mq = mqSelected;
                   brokersSent[times] = mq.getBrokerName();
                 ......

1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;

2.选择Broker的关键代码就在selectOneMessageQueue()方法中,通过前面拿到的topicPublishInfo作为参数,lastBrokerName作为额外的考虑参数;

追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()中:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
       if (this.sendLatencyFaultEnable) {
           try {
               int index = tpInfo.getSendWhichQueue().incrementAndGet();
               for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                   int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                   if (pos < 0)
                       pos = 0;
                   MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                   if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                       return mq;
               }
               final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
               int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
               if (writeQueueNums > 0) {
                   final MessageQueue mq = tpInfo.selectOneMessageQueue();
                   if (notBestBroker != null) {
                       mq.setBrokerName(notBestBroker);
                       mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                   }
                   return mq;
               } else {
                   latencyFaultTolerance.remove(notBestBroker);
               }
           } catch (Exception e) {
               log.error("Error occurred when selecting message queue", e);
           }
           return tpInfo.selectOneMessageQueue();
       }
       return tpInfo.selectOneMessageQueue(lastBrokerName);
   }

1.如果开启了延迟故障规避,那么执行规避策略;

  • 1.1:轮询找一个Broker,该Broker要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);

  • 1.2:如果所有的Broker都没有度过规避期,那么从比较好的那一部分Broker里面找一个出来;

  • 1.3:如果依然没有找到合适的Broker,那么就随机选一个Broker

2.否则就随机选一个Broker

下面我们来看一下随机发送的策略是怎么实现的:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
       if (lastBrokerName == null) {
           return selectOneMessageQueue();
       } else {
           for (int i = 0; i < this.messageQueueList.size(); i++) {
               int index = this.sendWhichQueue.incrementAndGet();
               int pos = Math.abs(index) % this.messageQueueList.size();
               if (pos < 0)
                   pos = 0;
               MessageQueue mq = this.messageQueueList.get(pos);
               if (!mq.getBrokerName().equals(lastBrokerName)) {
                   return mq;
               }
           }
           return selectOneMessageQueue();
       }
   }
   public MessageQueue selectOneMessageQueue() {
       int index = this.sendWhichQueue.incrementAndGet();
       int pos = Math.abs(index) % this.messageQueueList.size();
       if (pos < 0)
           pos = 0;
       return this.messageQueueList.get(pos);
   }

1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker,其实就是轮询方式;

2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker服务,也是为了让Broker服务负载均衡;

3.如果没有避开上一次的Broker,那么再向后找另一个Broker;除非只有一个Broker服务,否则会尽可能避开上次发送的Broker

小结

通过源码分析,我们已经知道了生产者是如何选择目标Broker的了:

1.第一次发消息,通过轮询的方式选择Broker

2.后续发消息会规避上次的Broker,同样采用轮询的方式选择Broker

3.在消息发送过程中,存在一个Broker规避列表,用户可以通过setSendLatencyFaultEnable(true)开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker,如果所有的Broker都在规避列表中,那么会选择一个相对比较好的Broker来用;

来源:https://juejin.cn/post/7169539055548858398

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com