RocketMQ消息生产者是如何选择Broker示例详解
作者:梦想实现家_Z 发布时间:2023-11-10 21:45:49
前言
在RocketMQ
中为,我们创建消息生产者时,只需要设置NameServer
地址,消息就能正确地发送到对应的Broker
中,那么RocketMQ
消息生产者是如何找到Broker
的呢?如果有多个Broker
实例,那么消息发送是如何选择发送到哪个Broker
的呢?
从NameServer查询Topic信息
通过Debug消息发送send()
方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()
这个方法,并且我们找到了最关键的Topic信息:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
这个方法就是通过topic
从NameServer
拉出对应的Broker
信息:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1.一开始的话,是从当前缓存中找Topic
信息,第一次肯定是找不到的;
2.找不到Topic
信息,那么就调用updateTopicRouteInfoFromNameServer(topic)
从NameServer
拉对应的信息,如果拉到了就更新到缓存中;
3.如果依然找不到Topic
信息,说明没有任何Broker
上面是有这个Topic
的;但是我们还要拉开启了自动创建Topic
配置的Broker
信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)
实现;
生产者客户端会从两个地方获取Broker
信息,第一个就是从内存缓存中获取,第二个就是从NameServer
中获取。从NameServer
中分两次获取,一次是获取存在的Topic
对应的Broker
信息,第二次是获取还没有创建出来的Topic
对应的Broker
信息;
如何选择Broker
当客户端拿到了Topic
对应的Broker
信息后,它是如何选择目标Broker
的呢?继续向下看,我们找到了关键代码:
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
......
1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;
2.选择Broker
的关键代码就在selectOneMessageQueue()
方法中,通过前面拿到的topicPublishInfo
作为参数,lastBrokerName
作为额外的考虑参数;
追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()
中:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
1.如果开启了延迟故障规避,那么执行规避策略;
1.1:轮询找一个
Broker
,该Broker
要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);1.2:如果所有的
Broker
都没有度过规避期,那么从比较好的那一部分Broker
里面找一个出来;1.3:如果依然没有找到合适的
Broker
,那么就随机选一个Broker
;
2.否则就随机选一个Broker
;
下面我们来看一下随机发送的策略是怎么实现的:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker
,其实就是轮询方式;
2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker
服务,也是为了让Broker
服务负载均衡;
3.如果没有避开上一次的Broker
,那么再向后找另一个Broker
;除非只有一个Broker
服务,否则会尽可能避开上次发送的Broker
;
小结
通过源码分析,我们已经知道了生产者是如何选择目标Broker
的了:
1.第一次发消息,通过轮询的方式选择Broker
;
2.后续发消息会规避上次的Broker
,同样采用轮询的方式选择Broker
;
3.在消息发送过程中,存在一个Broker
规避列表,用户可以通过setSendLatencyFaultEnable(true)
开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker
,如果所有的Broker
都在规避列表中,那么会选择一个相对比较好的Broker
来用;
来源:https://juejin.cn/post/7169539055548858398
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- 知识点:1.使用SQL Helper创建数据库2.数据的增删查改(PRDU:Put、Read、Delete、Update)背景知识:上篇文章
- 需求:类似医院或者商场,大屏幕无限轮播item (广告词/广告图…),供大家参考,具体内容如下代码如下/** * Created by Xi
- 今天介绍一个自己做的快递单号查询的简单APP,供大家参考。由于需要使用http和json,本文在build.gradle(module:ap
- 学习Java 本身是一个挺枯燥的过程,说白了就是每天敲代码而已。但如果换一种思路,可以编写各种各样的程序,不仅加深对代码的理解,同时提高兴趣
- 最近有个同事在调用一个类库中的方法时遇到了一个问题,异常信息如下:尝试释放正在使用的RCW,活动线程或其他线程上正在使用该 RCW,释放正在
- C#如何安全、高效地玩转任何种类的内存之Span的本质一、what - 痛点是什么?回答这个问题前,先总结一下如何用C#操作任何类型的内存:
- 本文实例为大家分享了Silverlight实现星星闪烁动画展示的具体代码,供大家参考,具体内容如下原理很简单,生成1000个圆,从随机数来布
- 缓存是HTTP协议的一个强大功能,但由于某些原因,它主要用于静态资源,如图像,CSS样式表或JavaScript文件,但是,HTTP缓存不仅
- 本文实例为大家分享了C#串口通信工具类的封装代码,供大家参考,具体内容如下 1、SerialPortHelper串口工具类封装us
- 在我们使用mybatis plus 时, mybatis plus 可以帮我们自动封装我们的实体类用来查询添加,当我们遇见我们的尸体类名与我
- 一、前言java是一门跨硬件平台的面向对象高级编程语言,java程序运行在java虚拟机上(JVM),由JVM管理内存,这点是和C++最大区
- 前面有写到Spring+SpringMVC+MyBatis深入学习及搭建(二)——MyBatis原始Dao开发和mapper代理开发MyBa
- 本文介绍在使用C#开发WinForm程序时,如何使用自定义的XML配置文件。虽然也可以使用app.config,但命名方面很别扭。我们在使用
- 一、遇到一个问题1、读取CSV文件package com.guor.demo.charset;import java.io.Buffered
- 我这一次讲使用scroll实现弹性滑动,我不会只有一个例子就说完,因为写文章的时候我也在学习,我分几次讲完吧。首先上一段代码,private
- 一、扫雷扫雷小游戏主要是利用字符数组、循环语句和函数实现。设计思路:雷盘大小为9*9,但是为了后续能更好的统计出雷的个数在定义数组的时候定义
- MybatisPlus 给我们提供了更加强大的代码生成器MyBatisPlus 在3.0.3版本之前使用代码生成器因为存在默认依赖,所以不需
- 1. 准备工作需要提前安装好Elasticsearch,访问地址:http://127.0.0.1:9200/ 得到以下结果,得到clust
- 1.Android 加载https请求的网页的时候 打不开当load有ssl层的https页面时,如果这个网站的安全证书在Android无法
- 随着市面上越来越多三方APP的出现,某些手机厂商也开始对这些APP进行了安装限制或者运行限制,或者三方APP自身的版本过低,无法被特定的系统