RocketMQ生产者一个应用不能发送多个NameServer消息解决
作者:梦想实现家_Z 发布时间:2022-05-18 15:56:11
前言
目前有两套RocketMQ集群,集群A包含topic
名称为cluster_A_topic
,集群B包含topic
名称为cluster_B_topic
,在应用服务OrderApp
上通过RocketMQ Client
创建两个DefaultMQProducer
实例发送消息给集群A和集群B
架构图如下:
根据上述架构图,我们给出的示例代码如下:
// 创建第一个DefaultMQProducer
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
// 设置nameServer地址
producer1.setNamesrvAddr("192.168.2.230:9876");
try {
producer1.start();
// 发送消息
SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result1.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_A_topic 发送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_A_topic 持久化失败!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_A_topic 同步slave失败!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_A_topic 副本不可用!");
}
} catch (Exception e) {
e.printStackTrace();
}
// 创建第二个DefaultMQProducer
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
// 设置nameServer地址
producer2.setNamesrvAddr("192.168.2.231:9876");
try {
producer2.start();
// 发送消息
SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
switch (result2.getSendStatus()) {
case SEND_OK:
System.out.println("cluster_B_topic 发送成功!");
break;
case FLUSH_DISK_TIMEOUT:
System.out.println("cluster_B_topic 持久化失败!");
break;
case FLUSH_SLAVE_TIMEOUT:
System.out.println("cluster_B_topic 同步slave失败!");
break;
case SLAVE_NOT_AVAILABLE:
System.out.println("cluster_B_topic 副本不可用!");
}
return "ok";
} catch (Exception e) {
e.printStackTrace();
} finally {
producer1.shutdown();
producer2.shutdown();
}
结果竟然报错了,报错内容时cluster_B_topic
不存在:
经过不断的测试,发现只有放在最前面启动的DefaultMQProducer
会生效,后面启动的DefaultMQProducer
发送消息就报错说对应的topic
不存在,而且报错的broker
竟然是前面启动的DefaultMQProducer
对应的broker
。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?
问题定位
首先说明一下,当前使用的RocketMQ Client
版本是4.8.0
。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸😢]。
我们都知道生产者是发送消息给Broker
的,获取Broker
信息是通过连接NameServer
获取的。既然报错的Broker
和目标Broker
竟然不对应,肯定是后面启动的生产者获取的Broker
不对。有了最基本的判断,我们先从DefaultMQProducer#start()
入手,最终我们定位到这样一段代码DefaultMQProducerImpl#start(final boolean startFactory)
:
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
// 如果生产者group名称不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 创建MQClientInstance实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 注册生产者实例到MQClientInstance中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 添加TBW102对应的topic信息,broker设置autoCreateTopicEnable = true才起作用
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 启动刚刚创建的MQClientInstance实例
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 修改服务状态为RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
上面的代码主要是创建了MQClientInstance
实例,并且通过start()
方法启动。
通过针对这两段代码的debug,我们发现创建的两个DefaultMQProducer
对象是共用了一个MQClientInstance
实例,并且所有针对NameServer
和Broker
的远程操作全部是通过MQClientInstance
实例来做的。比如发送消息的时候需要找到对应的Broker
下的消息队列:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从NameServer更新topic路由
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
最终我们发现两个DefaultMQProducer
对象都是去同一个NameServer
下获取对应的topic
信息,这下问题就定位到了:因为使用了同一个MQClientInstance
实例导致不同的DefaultMQProducer
去访问了同一个NameServer
,同一个集群需要同时接收两个topic
的消息,也就出现了前面的报错说topic
不存在的情况。
如何解决
我们来看看MQClientInstance
实例是如何保证唯一性的:
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 生成clientID
String clientId = clientConfig.buildMQClientId();
// 从缓存中获取MQClientInstance
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 没有缓存的话就创建一个MQClientInstance
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 新创建出来的再放进缓存
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
// 返回MQClientInstance实例
return instance;
}
我们之所以拿到的MQClientInstance
实例是同一个,是因为在同一个服务下创建的clientId
相同:
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
两个clientId
都是192.168.18.173@14933
,为了防止clientId
相同,我们可以在创建DefaultMQProducer
实例是加上unitName
值,保证两个unitName
值不同来避免共享同一个MQClientInstance
。
DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();
通过上述代码修改后,两个消息都发送成功了。
另一个办法就是升级RocketMQ Client
到4.9.0
,我们来看一下RocketMQ Client 4.9.0
是怎么解决这个问题的:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
RocketMQ Client 4.9.0
在后面补充了一个纳秒值,之前的代码是这样的:
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
也就是说,在新的版本中,一个应用服务内创建多个DefaultMQProducer
就会有多个MQClientInstance
实例对应,不会再出现我们前面的报错。
来源:https://juejin.cn/post/7166855806355406879


猜你喜欢
- 模仿网易新闻客户端阅读偏好的频道选择,先看实现的页面: 直接上代码:import android.content.res.Resources
- 1、定义方法的格式 访问修饰符 返回类型 方法名 (参数列表) { // 方法的主体… }2、静态方法在大多数时候,我们定义写一个方法,会把
- 1.@RequestMapping的介绍通过@RequestMapping,我们可以把请求地址和方法进行绑定的,可以在类、方法上进行声明。类
- 前言Java虽然五脏俱全但总有软肋,譬如获取CPU等硬件信息,当然我们可以通过JNI调用C/C++来获取,但对于对C/C++和Windows
- 简单了解计算机为什么要采用二进制表示信息:因为计算机作为一种电子计算机工具,是由大量的电子器件组成的,在这些电子器件中,电路的通断,电位的高
- 今天我们来编写一个缩放效果的ImageView ,网上有很多人都讲了这些。但有许多人都直接使用了库文件,那么我们今天做的是直接上代码编写一个
- 最近看代码,由于代码的调用层级深度比较多,层层深入到某处时,已经忘记了身处何处,虽然自己可以使用一些画图工具来时序图,但是,这种情况下,自己
- 本文实例讲述了Spring Bean的初始化和销毁。分享给大家供大家参考,具体如下:一 点睛在开发过程中,经常遇到在Bean使用之前或者之后
- 项目介绍springboot搭建的访客管理系统,针对高端基地做严格把控来访人员信息管理,用户后端可以设置多个管理员帐号,给予不同部门的管理层
- Android自定义View实现APP启动页倒计时效果,供大家参考,具体内容如下之前也是做过APP启动页的倒计时效果,但是只有文字变化,没有
- 前言缓存主要是为了提高数据的读取速度。因为服务器和应用客户端之间存在着流量的瓶颈,所以读取大容量数据时,使用缓存来直接为客户端服务,可以减少
- 当我们右键点击listview控件时,可以得到选择的项的各个文本内容。现在我们要求只获取右键点击时的单元格的文本内容。方法如下:1、定义全局
- 定义单一职责原则(Single Responsibility Principle, SRP),有且仅有一个原因引起类的变更。简单来说,就是针
- SpringMVC在接收集合请求参数时,需要在Controller方法的集合参数里前添加@RequestBody,而@RequestBody
- 项目中经常会使用到一对多的查询场景,但是PageHelper对这种嵌套查询的支持不够,如果是一对多的列表查询,返回的分页结果是不对的参考Gi
- 实现客户端发送请求,服务器端响应机制UDP客户端代码using System;using System.Text;using System.
- 需要的jar包:数据库代码:create database school character set utf8;use school;CRE
- 近日,Eclipse经常挂掉,都是由于JVM崩溃的原因。每次都有以下错误日志:## A fatal error has been detec
- C#用户定义类型转换•用于自定义类和结构能够进行隐式转换和显示转换.例如:将一个自定义类类型转换成整型,浮点型等,反之亦然.C#提供隐式转换
- 当用户从网络上读取微薄的时候,如果一下子全部加载用户未读的微薄这将耗费比较长的时间,造成不好的用户体验,同时一屏的内容也不足以显示如此多的内