RocketMq深入分析讲解两种削峰方式
作者:氵奄不死的鱼 发布时间:2023-04-04 01:38:47
何时需要削峰
当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求
通过消息队列的削峰方法有两种
控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度
通过消费者参数控制消费速度
先分析那些参数对控制消费速度有作用
1.PullInterval: 设置消费端,拉取mq消息的间隔时间。
注意:该时间算起时间是rocketMq消费者从broker消息后算起。经过PullInterval再次向broker拉去消息
源码分析:
首先需要了解rocketMq的消息拉去过程
拉去消息的类
PullMessageService
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullMessageServiceScheduledThread");
}
});
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
public void executeTaskLater(final Runnable r, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
@Override
public void shutdown(boolean interrupt) {
super.shutdown(interrupt);
ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
}
@Override
public String getServiceName() {
return PullMessageService.class.getSimpleName();
}
}
继承自ServiceThread,这是一个单线程执行的service,不断获取阻塞队列中的pullRequest,进行消息拉取。
executePullRequestLater会延时将pullrequest放入到pullRequestQueue,达到延时拉去的目的。
那么PullInterval参数就是根据这个功能发挥的作用,在消费者拉去消息成功的回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
在 case found的情况下,也就是拉取到消息的q情况,在PullInterval>0的情况下,会延时投递到pullRequestQueue中,实现拉取消息的间隔
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数
消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。
消费端每次pull到消息总数=PullBatchSize*监听队列数
源码分析
消费者拉取消息时
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中
会执行
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。
3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。
源码分析:
还是在消费者拉取消息成功时
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
通过consumeMessageService执行
默认情况下是并发消费
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
其中consumeExecutor初始化
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
对象线程池最大和核心线程数。对于顺序消费ConsumeMessageOrderlyService也会使用最大和最小线程数这两个参数,只是消费时会锁定队列。
以上三种情况:是针对参数配置,来调整消费速度。
除了这三种情况外还有两种服务部署情况,可以调整消费速度:
4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数
可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:修改tocpic的逻辑队列数量
5.消费端节点部署数量 :
部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。
消费延时控流
针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:
单节点部署:
ConsumInterval :延时时间单位毫秒
ConcurrentThreadNumber:消费端线程数量
MaxRate :理论每秒处理数量
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得
如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得
200 = 1 / 0.1 * 20
由上可知,理论上可以将并发消费控制在 200 以下
如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。
如下延时流控代码:
/**
* 测试mq 并发 接受
*/
@Component
@RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
@SneakyThrows
@Override
public void onMessage(LikeWritingParams params) {
System.out.println("睡上0.1秒");
Thread.sleep(100);
long begin = System.currentTimeMillis();
System.out.println("mq消费速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
//writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
long end = System.currentTimeMillis();
// System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
defaultMQPushConsumer.setPullBatchSize(32); //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
}
}
注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。
但是要注意实际操作中并发流控实际是默认存在的,
spring boot 消费端默认配置
this.consumeThreadMin = 20;
this.consumeThreadMax = 20;
this.pullInterval = 0L;
this.pullBatchSize = 32;
若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000
这里默认拉去的速度1s内远大于1000
注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000
所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义
使用rokcetMq支持的延时消息也可以实现消息的延时消费,通过对delayLevel对应的时间进行配置为我们的需求。为不同的消息设置不同delayLevel,达到延时消费的目的。
来源:https://blog.csdn.net/qq_37436172/article/details/128669909
猜你喜欢
- 前言本文主要给大家介绍了关于Spring Boot优化内嵌Tomcat的相关内容,分享出来供大家参考学习,下面话不多说了,来一看看详细的介绍
- 前言反射和注解在java中偏高级用法,一般在各种框架中被广泛应用,文章简单介绍下反射和注解的用法,希望对你的工作学习有一定帮助java注解什
- /** * 日期工具类 * 默认使用 "yyyy-MM-dd HH:mm:ss" 格式化日期&nbs
- 前言在电商的应用中,最常见的就是在首页或完成某事件之后,弹出一堆的活动/广告。假如重叠弹出,很丑,给用户的体验也不好,所以一般都会依次依条件
- 首先下载newtonsoft.json,增加引用using newtonsoft.json.linq;把jobject的内容提取出来,job
- 一、MyBatis的增删改查1.1、新增<!--int insertUser();--><insert id="
- 1、定时器推动整个计算机硬件的发展的核心关键性技术就是时钟。所以在企业开发中定时操作往往成为开发重点。而在JDK本身也支持这种定时调度的处理
- NumberFormat.getInstance()方法返回NumberFormat的一个实例(实际上是NumberFormat具体的一个子
- 初步探索首先我们要了解equals方法是什么,hashcode方法是什么。equals方法equals 是java的obejct类的一个方法
- 本文实例讲述了Spring实战之ResourceLoader接口资源加载用法。分享给大家供大家参考,具体如下:一 代码package lee
- 集合嵌套查询和集合嵌套结果的区别嵌套查询是多条sql语句分开写并配置,嵌套结果是一条sql语句关联查询并配置,实质效果是一样的。嵌套语句的查
- MyBatis的前身叫iBatis,本是apache的一个开源项目, 2010年这个项目由apache software foundatio
- typora-copy-images-to: ./一键清除maven仓库中下载失败的jar包maven是一款非常优秀的项目管理工具,特别是其
- 线程可以有六种状态:1.New(新创建)2.Runnable(可运行)(运行)3.Blocked(被阻塞)4.Waiting(等待)5.Ti
- 目录简介普通邮件导入javax.mail依赖Spring配置类邮件配置:mail.properties测试基于thymeleaf模版的邮件导
- 初级技巧 - 乐观锁乐观锁适合这样的场景:读不会冲突,写会冲突。同时读的频率远大于写。以下面的代码为例,悲观锁的实现:public Obje
- 定义:将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示。类型:创建类模式类图:四个要素产品类:一般是一个较为复杂的对
- 在该示例中,阐述了SpringMVC如何上传文件。1、上传页面upload.jsp<body> <form
- java加载properties文件的六种方法总结java加载properties文件的六中基本方式实现java加载properties文件
- 使用Postman传递arraylist数据给springboot起因:需要做一个批量删除的功能,后台接收一个ArrayList ids 。