软件编程
位置:首页>> 软件编程>> java编程>> RocketMq深入分析讲解两种削峰方式

RocketMq深入分析讲解两种削峰方式

作者:氵奄不死的鱼  发布时间:2023-04-04 01:38:47 

标签:RocketMq,削峰

何时需要削峰

当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求

通过消息队列的削峰方法有两种

控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度

通过消费者参数控制消费速度

先分析那些参数对控制消费速度有作用

1.PullInterval: 设置消费端,拉取mq消息的间隔时间。

注意:该时间算起时间是rocketMq消费者从broker消息后算起。经过PullInterval再次向broker拉去消息

源码分析:

首先需要了解rocketMq的消息拉去过程

拉去消息的类

PullMessageService

public class PullMessageService extends ServiceThread {
   private final InternalLogger log = ClientLogger.getLog();
   private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
   private final MQClientInstance mQClientFactory;
   private final ScheduledExecutorService scheduledExecutorService = Executors
   .newSingleThreadScheduledExecutor(new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
           return new Thread(r, "PullMessageServiceScheduledThread");
       }
   });
   public PullMessageService(MQClientInstance mQClientFactory) {
       this.mQClientFactory = mQClientFactory;
   }
   public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
       if (!isStopped()) {
           this.scheduledExecutorService.schedule(new Runnable() {
               @Override
               public void run() {
                   PullMessageService.this.executePullRequestImmediately(pullRequest);
               }
           }, timeDelay, TimeUnit.MILLISECONDS);
       } else {
           log.warn("PullMessageServiceScheduledThread has shutdown");
       }
   }
   public void executePullRequestImmediately(final PullRequest pullRequest) {
       try {
           this.pullRequestQueue.put(pullRequest);
       } catch (InterruptedException e) {
           log.error("executePullRequestImmediately pullRequestQueue.put", e);
       }
   }
   public void executeTaskLater(final Runnable r, final long timeDelay) {
       if (!isStopped()) {
           this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
       } else {
           log.warn("PullMessageServiceScheduledThread has shutdown");
       }
   }
   public ScheduledExecutorService getScheduledExecutorService() {
       return scheduledExecutorService;
   }
   private void pullMessage(final PullRequest pullRequest) {
       final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
       if (consumer != null) {
           DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
           impl.pullMessage(pullRequest);
       } else {
           log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
       }
   }
   @Override
   public void run() {
       log.info(this.getServiceName() + " service started");
       while (!this.isStopped()) {
           try {
               PullRequest pullRequest = this.pullRequestQueue.take();
               this.pullMessage(pullRequest);
           } catch (InterruptedException ignored) {
           } catch (Exception e) {
               log.error("Pull Message Service Run Method exception", e);
           }
       }
       log.info(this.getServiceName() + " service end");
   }
   @Override
   public void shutdown(boolean interrupt) {
       super.shutdown(interrupt);
                      ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
                      }
                      @Override
                      public String getServiceName() {
                      return PullMessageService.class.getSimpleName();
                      }
                      }

继承自ServiceThread,这是一个单线程执行的service,不断获取阻塞队列中的pullRequest,进行消息拉取。

executePullRequestLater会延时将pullrequest放入到pullRequestQueue,达到延时拉去的目的。

那么PullInterval参数就是根据这个功能发挥的作用,在消费者拉去消息成功的回调

PullCallback pullCallback = new PullCallback() {
           @Override
           public void onSuccess(PullResult pullResult) {
               if (pullResult != null) {
                   pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                       subscriptionData);
                   switch (pullResult.getPullStatus()) {
                       case FOUND:
                           long prevRequestOffset = pullRequest.getNextOffset();
                           pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                           long pullRT = System.currentTimeMillis() - beginTimestamp;
                           DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                               pullRequest.getMessageQueue().getTopic(), pullRT);
                           long firstMsgOffset = Long.MAX_VALUE;
                           if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                               DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                           } else {
                               firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                               DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                   pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                               boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                               DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                   pullResult.getMsgFoundList(),
                                   processQueue,
                                   pullRequest.getMessageQueue(),
                                   dispatchToConsume);
                               if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                   DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                       DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                               } else {
                                   DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                               }
                           }
                           if (pullResult.getNextBeginOffset() < prevRequestOffset
                               || firstMsgOffset < prevRequestOffset) {
                               log.warn(
                                   "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                   pullResult.getNextBeginOffset(),
                                   firstMsgOffset,
                                   prevRequestOffset);
                           }
                           break;
                       case NO_NEW_MSG:
                           pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                           DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                           DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                           break;
                       case NO_MATCHED_MSG:
                           pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                           DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                           DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                           break;
                       case OFFSET_ILLEGAL:
                           log.warn("the pull request offset illegal, {} {}",
                               pullRequest.toString(), pullResult.toString());
                           pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                           pullRequest.getProcessQueue().setDropped(true);
                           DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                               @Override
                               public void run() {
                                   try {
                                       DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                           pullRequest.getNextOffset(), false);
                                       DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                       DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                       log.warn("fix the pull request offset, {}", pullRequest);
                                   } catch (Throwable e) {
                                       log.error("executeTaskLater Exception", e);
                                   }
                               }
                           }, 10000);
                           break;
                       default:
                           break;
                   }
               }
           }
           @Override
           public void onException(Throwable e) {
               if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                   log.warn("execute the pull request exception", e);
               }
               DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
           }
       };

在 case found的情况下,也就是拉取到消息的q情况,在PullInterval>0的情况下,会延时投递到pullRequestQueue中,实现拉取消息的间隔

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                   DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                       DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                               } else {
                                   DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                               }

2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数

消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。

消费端每次pull到消息总数=PullBatchSize*监听队列数

源码分析

消费者拉取消息时

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中

会执行

this.pullAPIWrapper.pullKernelImpl(
               pullRequest.getMessageQueue(),
               subExpression,
               subscriptionData.getExpressionType(),
               subscriptionData.getSubVersion(),
               pullRequest.getNextOffset(),
               this.defaultMQPushConsumer.getPullBatchSize(),
               sysFlag,
               commitOffsetValue,
               BROKER_SUSPEND_MAX_TIME_MILLIS,
               CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
               CommunicationMode.ASYNC,
               pullCallback
           );

其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。

3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。

源码分析:

还是在消费者拉取消息成功时

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                   pullResult.getMsgFoundList(),
                                   processQueue,
                                   pullRequest.getMessageQueue(),
                                   dispatchToConsume);

通过consumeMessageService执行

默认情况下是并发消费

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest

@Override
   public void submitConsumeRequest(
       final List<MessageExt> msgs,
       final ProcessQueue processQueue,
       final MessageQueue messageQueue,
       final boolean dispatchToConsume) {
       final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
       if (msgs.size() <= consumeBatchSize) {
           ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
           try {
               this.consumeExecutor.submit(consumeRequest);
           } catch (RejectedExecutionException e) {
               this.submitConsumeRequestLater(consumeRequest);
           }
       } else {
           for (int total = 0; total < msgs.size(); ) {
               List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
               for (int i = 0; i < consumeBatchSize; i++, total++) {
                   if (total < msgs.size()) {
                       msgThis.add(msgs.get(total));
                   } else {
                       break;
                   }
               }
               ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
               try {
                   this.consumeExecutor.submit(consumeRequest);
               } catch (RejectedExecutionException e) {
                   for (; total < msgs.size(); total++) {
                       msgThis.add(msgs.get(total));
                   }
                   this.submitConsumeRequestLater(consumeRequest);
               }
           }
       }
   }

其中consumeExecutor初始化

this.consumeExecutor = new ThreadPoolExecutor(
           this.defaultMQPushConsumer.getConsumeThreadMin(),
           this.defaultMQPushConsumer.getConsumeThreadMax(),
           1000 * 60,
           TimeUnit.MILLISECONDS,
           this.consumeRequestQueue,
           new ThreadFactoryImpl("ConsumeMessageThread_"));

对象线程池最大和核心线程数。对于顺序消费ConsumeMessageOrderlyService也会使用最大和最小线程数这两个参数,只是消费时会锁定队列。

以上三种情况:是针对参数配置,来调整消费速度。

除了这三种情况外还有两种服务部署情况,可以调整消费速度:

4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数

可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:修改tocpic的逻辑队列数量

5.消费端节点部署数量 :

部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。

消费延时控流

针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:

单节点部署:

ConsumInterval :延时时间单位毫秒

ConcurrentThreadNumber:消费端线程数量

MaxRate :理论每秒处理数量

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

200 = 1 / 0.1 * 20

由上可知,理论上可以将并发消费控制在 200 以下

如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。

如下延时流控代码:

/**
    * 测试mq 并发 接受
    */
   @Component
   @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
   class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
       @SneakyThrows
       @Override
       public void onMessage(LikeWritingParams params) {
           System.out.println("睡上0.1秒");
           Thread.sleep(100);
           long begin = System.currentTimeMillis();
           System.out.println("mq消费速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
           //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
           long end = System.currentTimeMillis();
         //  System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
       }
       @Override
       public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
           defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
           defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
           defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
           defaultMQPushConsumer.setPullBatchSize(32);     //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
           defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
           defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
           defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
       }
   }

注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。

但是要注意实际操作中并发流控实际是默认存在的,

spring boot 消费端默认配置

this.consumeThreadMin = 20;

this.consumeThreadMax = 20;

this.pullInterval = 0L;

this.pullBatchSize = 32;

若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000

这里默认拉去的速度1s内远大于1000

注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000

所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义

使用rokcetMq支持的延时消息也可以实现消息的延时消费,通过对delayLevel对应的时间进行配置为我们的需求。为不同的消息设置不同delayLevel,达到延时消费的目的。

来源:https://blog.csdn.net/qq_37436172/article/details/128669909

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com