java rocketmq--消息的产生(普通消息)
作者:有爱jj 发布时间:2023-10-19 08:51:50
前言
与消息发送紧密相关的几行代码:
1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
2. producer.start();
3. Message msg = new Message(...)
4. SendResult sendResult = producer.send(msg);
5. producer.shutdown();
那这几行代码执行时,背后都做了什么?
一. 首先是DefaultMQProducer.start
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
调用了默认生成消息的实现类 -- DefaultMQProducerImpl
调用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()会初始化得到MQClientInstance实例对象,MQClientInstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务PullMessageService.Start()、启动负载平衡服务RebalanceService.Start(),比如网络通信服务MQClientAPIImpl.Start()
另外,还会执行与生产消息相关的信息,如注册produceGroup、new一个TopicPublishInfo对象并以默认TopicKey为键值,构成键值对存入DefaultMQProducerImpl的topicPublishInfoTable中。
efaultMQProducerImpl.start()后,获取的MQClientInstance实例对象会调用sendHeartbeatToAllBroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述DefaultMQProducerImpl.start()过程:
上图中的三个部分中涉及的内容:
1.1 初始化MQClientInstance
一个客户端只能产生一个MQClientInstance实例对象,产生方式使用了工厂模式与单例模式。MQClientInstance.start()方法启动一些服务,源码如下:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
1.2 注册producer
该过程会将这个当前producer对象注册到MQClientInstance实例对象的的producerTable中。一个jvm(一个客户端)中一个producerGroup只能有一个实例,MQClientInstance操作producerTable大概有如下几个方法:
-- selectProducer
-- updateTopicRouteInfoFromNameServer
-- prepareHeartbeatData
-- isNeedUpdateTopicRouteInfo
-- shutdown
注:
根据不同的clientId,MQClientManager将给出不同的MQClientInstance;
根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer
1.3 向路由信息表中添加路由
topicPublishInfoTable定义:
public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
它是一个以topic为key的Map型数据结构,DefaultMQProducerImpl.start()时会默认创建一个key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。
1.4 发送心跳包
MQClientInstance向broker发送心跳包时,调用sendHeartbeatToAllBroker( ),以及从MQClientInstance实例对象的brokerAddrTable中拿到所有broker地址,向这些broker发送心跳包。
sendHeartbeatToAllBroker会涉及到prepareHeartbeatData()方法,该方法会生成heartbeatData数据,发送心跳包时,heartbeatData作为心跳包的body。与producer相关的部分代码如下:
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
二、. SendResult sendResult = producer.send(msg)
首先会调用DefaultMQProducer.send(msg) ,继而调用sendDefaultImpl:
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
sendDefaultImpl做了啥?
2.1. 获取topicPublishInfo
根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:
首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。
2.2 选择消息发送的队列
普通消息:默认方式下,selectOneMessageQueue从topicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息,默认采用长轮询的方式选择队列 。
它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。
顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据参数arg字段来选择queue。
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}
事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。
2.3 封装消息体通信包,发送数据包
首先,根据获取的MessageQueue中的getBrokerName,调用findBrokerAddressInPublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :
brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)
可知获取的broker均为master(id=0)
然后, 将与该消息相关信息打包成RemotingCommand数据包,其RequestCode.SEND_MESSAGE
根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。
封装消息请求包的包头:
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
发送消息包(普通消息默认为同步方式):
SendResult sendResult = null;
switch (communicationMode) {
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
处理来自broker端的响应数据包:
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
broker端处理request数据包后会将消息存储到commitLog,具体过程后续分析。
来源:https://www.cnblogs.com/chenjunjie12321/p/7728434.html


猜你喜欢
- 简介Android Studio升级到3.0后,有不少的改动和新特性,先贴出官方的迁移说明。本文会持续收集与总结本人在使用Android S
- 在实战中学习Spring,本系列的最终目的是完成一个实现用户注册登录功能的项目。预想的基本流程如下:1、用户网站注册,填写用户名、密码、em
- 本文实例讲述了C#动态加载dll扩展系统功能的方法。分享给大家供大家参考。具体分析如下:动态加载dll,主要是为了扩展功能,增强灵活性而实现
- 目录一、字节码文件 与 JVM二、字节码文件示例三、字节码文件二进制结构分析1、魔数2、次版本号3、主版本号4、常量池个数总结一、字节码文件
- 1、应用场景:从一份html文件中或从String(是html内容)中提取纯文本,去掉网页标签;2、代码一:replaceAll搞定//从h
- public class User { public
- 在C#中,当引用类型需要转换的时候,经常会用到关键字is、as以及显式强转。本篇来体验这三者的用法。先来梳理.NET引用类型转换的"
- Android SDK已经提供有进度条组件ProgressDialog组件,但用的时候我们会发现可能风格与我们应用的整体风格不太搭配,而且P
- Mybatis log printf工具网页地址: http://www.feedme.ltd/log.htmlMybatis执行的sql的
- 通常我们在进行数据绑定的时候,常用的数据源有DataSet、DataTable、BindingList<T>、还有强类型数据源。
- 一、介绍本教程将介绍如何使用 mybatis-plus 工具自动给我们生成 Controller、Service、Entity、Mapper
- 实例如下:import java.util.ArrayList;import java.util.HashSet;import java.u
- 引言二分查找底层依赖的是数组随机访问的特性,所以只能用数组来实现。如果数据存储在链表中,就真的没法用二分查找算法了吗?实际上,只需要对链表稍
- launch 是 CoroutineScope 的一个扩展函数,该方法在不阻塞当前线程的情况下启动新的协程,launch 里面的代码虽然有挂
- 使用Spring3 实现用户登录以及权限认证这里我就简单介绍一下,我在实现的时候处理的一些主要的实现。1.用户登录 <form act
- 前言你好! 本文章主要介绍如何用Android Studio制作简易的门户界面,主要说明框架的各部分功能与实现过程,结尾处附有源码。界面分析
- Tab与TabHost:这就是Tab,而盛放Tab的容器就是TabHost 。如何实现?? 每一个Tab还对应了一个布局,这个就有点好玩了。
- 委托这个东西不是很好理解,可是工作中又经常用到,你随处可以看到它的身影,真让人有一种又爱又恨的感觉,我相信许多人被它所困扰过。一提到委托,如
- 1. 引入依赖pom文件引入activemq依赖<!--activeMq配置--> &
- 一、隐藏标题栏 //隐藏标题栏 &