RocketMQ生产者如何规避故障Broker方式详解
作者:梦想实现家_Z 发布时间:2022-06-23 04:36:10
前言
在消息发送过程中,生产者从NameServer
中获取到了指定Topic
对应的Broker
信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的。那么Broker
有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker
的呢?
收集故障Broker
我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl
类中查找:
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
无论是发送成功还是失败,RocketMQ生产者客户端都会做这一步操作:
// 发送成功的话,isolation传false,失败isolation传true
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
如果Broker
产生故障,那么会创建一个FaultItem
对象记录故障的Broker
,并把结果放进故障规避表faultItemTable
中,数据格式如下:
"broker-a": {
// broker名称
"name": "broker-a",
"currentLatency": 发送消息消耗的时间,毫秒值之差,
// 解除规避的时间,绝对时间
"startTimestamp": 时间戳毫秒值
},
"broker-b": {
// broker名称
"name": "broker-b",
"currentLatency": 发送消息消耗的时间,毫秒值之差,
// 解除规避的时间,绝对时间
"startTimestamp": 时间戳毫秒值
}
发送成功的Broker
设置的故障规避时间为0,发送失败的Broker
将被设置为规避30秒;
选择Broker
在MQFaultStrategy.selectOneMessageQueue()
方法中,我们分三部分来分析如何选择Broker。
轮询选择一个可用的Broker
// 轮询的基本套路,一个自增变量
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 通过对队列数量取模,获取选定的Broker所在的位置
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断Broker是否在规避时间内,如果不在规避时间内,就选择这个Broker,否则继续循环直至所有Broker都在规避时间内
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
1.轮询的基本套路都是通过一个自增变量来对所有的Broker数量取模,这样就可以命中一个Broker;
2.针对命中的Broker判断是否在规避时间范围内,不在规避时间内就可以返回;否则只能进入第二个方案;
选择一个相对延迟低的Broker
// 把所有规避列表中的Broker按延迟高低排序,并从延迟低的Broker中选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 判断该Broker是否允许写消息
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
// 返回选中的Broker
return mq;
}
1.从规避列表中找到延时比较低的Broker;
2.判断该Broker是否允许写消息,允许写消息的话就直接返回,否则再进入下一个方案;
默认的选择
return tpInfo.selectOneMessageQueue();
最后直接轮询一个Broker
直接返回:
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
该方案是默认方案,没有开启故障规避配置的话,所有Broker的选择都是使用的该方案;
小结
RocketMQ
通过设置故障规避表的方式,把所有的Broker
的延迟数据都保留在故障规避表中,根据该列表制定了以下几种策略:
1.优先选择不在规避时间范围内的Broker
;
2.如果所有Broker
都在规避时间内,优先选择延迟低的Broker
;
3.如果依然没有选中合适的Broker
,那么就直接挑一个Broker
来用;
来源:https://juejin.cn/post/7169958001225400333


猜你喜欢
- 前言记录下SpringBoot修改yml配置文件后无需重启服务的方式(打包后生效),效果如下:具体实现实现代码pom.xml<depe
- 目录1. 简单认识BeanProcessorBeanProcessor的理解一个BeanProcessor的使用实例2. BeanProce
- 具体实现过程不多说了,直接贴代码了。using System;using System.Collections.Generic;using
- 本文实例形式介绍了VB.NET中TextBox的智能感知实现方法,功能非常实用,具体如下:该实例主要实现:在TextBox中键入字符,可以智
- 一、重载 1、重载示意 重载(Overload)是重新加载的意思,指的是同一个类中同名,但是参
- 项目需要从其他网站获取数据,因为是临时加的需求,在开始项目时没想到需要多数据源于是百度了一下,发现只需要改动一下Spring 的applic
- 前言作为一个开发者,日常会接触到很多优秀的软件,其实,或多或少会有这样的想法,我能不能开发一个自己软件,甚至办公软件都希望是Markdown
- 一.代码实现1. “Activity_11\src\yan\activity_11\MainActivity.java”pack
- 本文解析了C# KeyUp事件中MessageBox的回车(Enter)键出现回调问题的解决办法。具体问题如下:在一个窗体上有一个名为txt
- 有时候在单机部署,或者项目没有在IDea 开发工具中运行(idea可以自动打开tomcat项目),需要项目启动后自动打开浏览器访问项目,配置
- 一、获取apikey,appsecret与商户号注册公众号、商户号二、获取用户的OpenId1.设置【授权回调页面域名】官方解释:用户在网页
- 什么是进程?当一个程序开始运行时,它就是一个进程,进程包括运行中的程序和程序所使用到的内存和系统资源。而一个进程又是由多个线程所组成的。什么
- 本文实例讲述了C#实现动态生成静态页面的类。分享给大家供大家参考,具体如下:动态生成静态页面有许多好处,比如生成html网页有利于被搜索引擎
- 配置文件m103替换为hdfs服务地址。要利用Java客户端来存取HDFS上的文件,不得不说的是配置文件hadoop-0.20.2/conf
- Android 使用mediaplayer播放res/raw文件夹中的音乐的实例(1)在res文件夹中新建一个文件夹重命名为raw,并且将要
- 本文实例为大家分享了C#线程倒计时器源码,供大家参考,具体内容如下using System;using System.Collections
- 一条SQL使用两个foreach的问题未修改前的 SQL 语句<select id="findQuestionType_3_
- 前言:封装、继承和多态是面向对象编程的三大特征。1.封装1.1.封装概念封装就是把抽象出的数据(属性)和对数据的操作(方法)封装在一起,数据
- 在上面介绍过栈(Stack)的存储结构,接下来介绍另一种存储结构字典(Dictionary)。 字典(Dictionary)里面的每一个元素
- 1.查询(get)-调用的时候记得开线程GET一般用于获取/查询资源信息val sb = StringBuffer() try { &nbs