RocketMQ之Consumer整体介绍启动源码分析
作者:林师傅 发布时间:2022-06-04 03:53:31
前言
从本篇文章开始,我们将逐步开始分析Consumer的源码,首先我们将整体介绍Consumer的接口和相关实现类以及DefaultMQPushConsumer的主要API和关键属性,然后我们将分析Consumer的启动过程源码,通过对启动过程的分析,之前我们分析过Producer和Broker的启动源码,Consumer的启动源码与Producer还是有很多相似的地方。
Consumer整体介绍
Consumer实现类
RocketMQ给我们提供的Consumer实现类如下图所示,包括推送式的DefaultMQPushConsumer
和拉取式的DefaultMQPullConsumer
、DefaultLitePullConsumer
,从图中可以看到DefaultMQPullConsumer
已经被标注为deprecated,如果需要使用拉取式的Consumer,官方推荐使用DefaultLitePullConsumer。
Consumer消费类型
拉取式消费
Consumer主动从Broker拉去消息,消费消息的主动权由Consumer控制。一旦获取了批量消息,就会启动消费过程。不过这种方式实时性较弱,即Broker中有了新的消息时消费者并不能及时发现并消费。
推送式消费
该模式下Broker收到数据后会主动推送给Consumer,这种方式一般实时性比较高。
RocketMQ官方更推荐我们在日常工作中使用DefaultMQPushConsumer
,它已经能够满足我们大多数使用场景。从技术上讲,这个DefaultMQPushConsumer
客户端实际上是底层拉取服务的包装器。当从代理中提取的消息到达时,它大致调用注册的回调处理程序来馈送消息。本篇文章,我们将介绍DefaultMQPushConsumer
的启动流程
DefaultMQPushConsumer主要API
DefaultMQPushConsumer实现了MQConsumer和MQPushConsumer接口,DefaultMQPushConsumer的主要API都在这两个接口中定义了,如下所示
// org.apache.rocketmq.client.consumer.MQConsumer
public interface MQConsumer extends MQAdmin {
// 如果消费失败,消息将被发送回代理,并延迟消耗一些时间
void sendMessageBack(final MessageExt msg/*消息*/, final int delayLevel/*延迟级别*/, final String brokerName);
// 根据topic从使用者缓存中获取消息队列
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
}
// org.apache.rocketmq.client.consumer.MQPushConsumer
public interface MQPushConsumer extends MQConsumer {
// 启动Consumer
void start() throws MQClientException;
// 关闭Consumer
void shutdown();
// 注册并发消息Listener
void registerMessageListener(final MessageListenerConcurrently messageListener);
// 注册顺序消息Listener,将会有序地接收消息。一个队列一个线程
void registerMessageListener(final MessageListenerOrderly messageListener);
// 订阅Topic
void subscribe(final String topic, final String subExpression) throws MQClientException;
// 退订topic
void unsubscribe(final String topic);
}
DefaultMQPushConsumer关键属性
DefaultMQPushConsumer的关键属性如下所示
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
// DefaultMQPushConsumer的默认实现,DefaultMQPushConsumer中大部分功能都是对它的代理
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 相同角色的消费者需要具有完全相同的subscriptions和consumerGroup才能正确实现负载平衡,它需要全局唯一
private String consumerGroup;
// 消息模型定义了如何将消息传递到每个消费者客户端的方式,默认是集群模式
private MessageModel messageModel = MessageModel.CLUSTERING;
// 第一次消费时指定的消费策略,默认是CONSUME_FROM_LAST_OFFSET
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 队列分配算法,指定如何将消息队列分配给每个使用者客户端。
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// 订阅关系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息 *
private MessageListener messageListener;
// 消息消费进度存储器
private OffsetStore offsetStore;
// 最小消费线程数
private int consumeThreadMin = 20;
// 最大消费线程数
private int consumeThreadMax = 20;
// 推送模式下拉去消息的间隔时间,默认一次拉取消息完成后立刻继续拉取
private long pullInterval = 0;
// 批量消费数量
private int consumeMessageBatchMaxSize = 1;
// 批量拉取的数量
private int pullBatchSize = 32;
// 每次拉取时是否更新订阅关系,默认是false
private boolean postSubscriptionWhenPull = false;
// 消息最大重试次数,如果消息消费最大次数超过maxReconsumeTimes还未成功,则消息将被转移到一个失败队列
private int maxReconsumeTimes = -1;
//延迟将该队列的消息提交到消费者线程的等待时间,默认延迟1s
private long suspendCurrentQueueTimeMillis = 1000;
// 消息阻塞消费线程的最大超时时间,默认15分钟
private long consumeTimeout = 15;
// 关闭使用者时等待消息的最长时间,0表示没有等待。
private long awaitTerminationMillisWhenShutdown = 0;
}
Consumer消费模式
Consumer提供下面两种消费模式,由上面DefaultMQPushConsumer的messageModel定义
广播模式(BROADCASTING)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息会被相同Consumer Group中的所有Consumer消费
集群模式(CLUSTERING)
集群模式是Consumer默认的消费模式,集群消费模式下,相同Consumer Group的每个Consumer按照负载均衡策略分摊同一个Topic消息,即每条消息只会被相同Consumer Group中的一个Consumer消费
Consumer消费策略
Consumer主要提了下面三种消费策略
CONSUME_FROM_LAST_OFFSET
这是Consumer默认的消费策略,它分为两种情况,如果Broker的磁盘消息未过期且未被删除,则从最小偏移量开始消费。如果磁盘已过期,并被删除,则从最大偏移量开始消费。
CONSUME_FROM_FIRST_OFFSET
从最早可用的消息开始消费
CONSUME_FROM_TIMESTAMP
从指定的时间戳开始消费,这意味着在consumeTimestamp之前生成的消息将被忽略
Consumer使用
要使用Consumer开始消费消息,至少需要下面5个步骤
public static void main(String[] args) throws MQClientException {
// 1. 传入CONSUMER_GROUP,创建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 2. 设置namesrvAddr
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3. 订阅Topic
consumer.subscribe(TOPIC, "*");
// 4.注册消息Listener
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 5.启动Consumer
consumer.start();
}
DefaultMQPushConsumer源码分析
启动源码分析
DefaultMQPushConsumer只是设置属性,Consumer的初始化实际是在DefaultMQPushConsumer#start
中执行的,DefaultMQPushConsumer#start
实际调用了DefaultMQPushConsumerImpl#start
执行初始化。
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
public void start() throws MQClientException {
// consumerGroup封装namespace
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// DefaultMQPushConsumerImpl启动
this.defaultMQPushConsumerImpl.start();
// 消息轨迹跟踪服务,默认null
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
下面我们来分步骤分析DefaultMQPushConsumerImpl#start
代码
第一步:
先将Consumer的状态更新为START_FAILED
校验Consumer的配置。主要校验ConsumerGroup,
消费模式校验(MessageModel),消费开始位置(ConsumeFromWhere),消费时间戳(默认是半小时之前),队列分配策略(默认是AllocateMessageQueueAveragely),订阅Topic和Subscription关系校验,消息 * (MessageListener)校验等。
将Consumer中的订阅关系拷贝到RebalanceImpl中,Consumer中订阅关系的来源主要包括
DefaultMQPushConsumerImpl#subscribe
方法获取,也会订阅重试topic,其主题名为%RETRY%+消费者组名
,消费者启动时会自动订阅该主题如果是集群模式,则修改消费者名称为
PID#时间戳
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
//...
// 状态先设置为启动失败
this.serviceState = ServiceState.START_FAILED;
// 校验配置,ConsumerGroup校验,
this.checkConfig();
// 订阅关系copy到RebalanceImpl中
this.copySubscription();
// 如果是集群模式,消费者名称如果是DEFAULT,则会改成:PID#时间戳
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//...
}
第二步:
主要是初始化MQClientInstance、RebalanceImpl和pullAPIWrapper。
**MQClientInstance:**是消息拉取服务,主要用于拉取消息,同一个进程内的所有Consumer会使用同一个MQClientInstance
**RebalanceImpl:**是消费者负载均衡服务,用于确定消费者消费的消息队列以及负载均衡。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 生成一个MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置消费者组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 消息消费模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 设置消息消费模式
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置MQClientInstance
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 构建拉消息包装器
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
第三步:
根据消息消费模式的不同设置不同的消息消费进度存储器(OffsetStore),如果是广播模式,则使用LocalFileOffsetStore作为消息进度存储器,如果是集群模式则使用RemoteBrokerOffsetStore作为消息进度存储器。创建完成之后调用load()方法加载偏移量,如果是LocalFileOffsetStore将会从本地加载。
广播模式下:LocalFileOffsetStore将消费进度存储在Consumer本地的${user.home}/.rocketmq_offsets/clientId/consumerGroup/offsets.json
文件中
集群模式下:RemoteBrokerOffsetStore将消费进度存储在Broker
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 如果是广播模式,则使用LocalFileOffsetStore存储偏移量
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
// 如果是集群模式,则使用RemoteBrokerOffsetStore存储偏移量
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 如果是广播模式,则从本地文件load偏移量,如果是集群模式则是一个空实现
this.offsetStore.load();
第四步:
根据消息 * 的类型不同创建不同的消息消费服务(并发/顺序消息消费服务),并启动。然后注册消费者组和消费者信息到MQClientInstance中的consumerTable中,注册成功后启动MQClientInstance客户端通信实例。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 如果是顺序消费
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
// 如果是并发消费
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 将自身注册到MQClientInstance
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
// ...
mQClientFactory.start();
第五步:
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 向Namesrv拉取并更新当前消费者订阅topic路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确
this.mQClientFactory.checkClientInBroker();
// 给所有Broker发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒负载均衡服务rebalanceService,并进行rebalance
this.mQClientFactory.rebalanceImmediately();
来源:https://juejin.cn/post/7230558835228278843
猜你喜欢
- 为什么使用logback记得前几年工作的时候,公司使用的日志框架还是log4j,大约从16年中到现在,不管是我参与的别人已经搭建好的项目还是
- 昨天有朋友在公众号发消息说看不懂await,async执行流,其实看不懂太正常了,因为你没经过社会的毒打,没吃过牢饭就不知道自由有多重要,没
- 本文实例讲述了Java实现的简单音乐播放器功能。分享给大家供大家参考,具体如下:应用名称:Java简单的音乐播放器用到的知识:Java GU
- 实践过程效果代码public partial class Form1 : Form{ private HookEx
- 介绍try-with-resources是Java中的环绕语句之一,旨在减轻开发人员释放try块中使用的资源的义务。它最初在Java 7中引
- Unity 跑马灯抽奖效果实现代码,供大家参考,具体内容如下这边用到插件是NGUI+Dotween,思路简单说下:先排版,通过移动图片蒙版来
- 1.Fork/Join框架简介Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并
- 一.抽象类(一)概念 在继承的层次结构中,每个新的子类都使类变得更加明确和具体。如果从一个子类向父类
- zip 是一个非常常见的压缩包格式,本文主要用于说明如何使用代码 文件或文件夹压缩为 zip压缩包及其解压操作,我们采用的是 微软官方的实现
- 相信大家对SaaS架构都有所了解,这里也不过多介绍,让我们直奔主题。技术框架springboot版本为2.3.4.RELEASE持久层采用J
- 1.过滤器 (Filter)过滤器的配置比较简单,直接实现Filter 接口即可,也可以通过@WebFilter注解实现对特定URL拦截,看
- 在线用户使用HttpSessionListener * 统计 每当一个session会话建立 在线用户人数+1每当一
- 网上找了几个,写的都不太适合,有的写出来了,也没有给出参考的算法链接。这样就导致了如果产生错误我们无法排查(不懂原理怎么排查对吧)。如果在使
- 前言哈喽,我是小黑, 最近学了java的输入输出流后一直心痒痒,总想找一点事情来做,所以用java代码来实现了一下统计代码的所有行数,看一下
- 🍊一. 为什么需要线程通信线程是并发并行的执行,表现出来是线程随机执行,但是我们在实际应用中对线程的执行顺序是有要求的,这就需要用到线程通信
- 什么是事务处理事务是计算机应用中不可或缺的组件模型,它保证了用户操作的原子性 ( Atomicity )、一致性 ( Consistency
- 前言本文主要给大家介绍了关于C#中foreach遍历的用法以及c#使用foreach需要知道的一些事,分享出来供大家参考学习,下面话不多说了
- 概述在移动应用开发中,消息推送可以说是一项非常重要的功能,它能够起到提醒或者唤醒用户的作用,同时也是产品运营人员更高效地实现运营目标的重要手
- 作为一位开发人员,都要有严格的代码规范。为此我总结了一些代码规范案例。目 录1. 前言2. 试用范围3. JAVA命名规范--3.1 公共约
- 背景之前和同事讨论一个问题,他们公司调研中发现forEach的速度比for的速度慢,当刚听到这个结论的时候有点诧异。因为之前看过国外的文章和