基于rocketmq的有序消费模式和并发消费模式的区别说明
作者:从心归零 发布时间:2021-10-29 08:41:02
rocketmq消费者注册监听有两种模式
有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。
MessageListenerOrderly
正确消费返回
ConsumeOrderlyStatus.SUCCESS
稍后消费返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently
正确消费返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍后消费返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
顾名思义,有序消费模式是按照消息的顺序进行消费,但是除此之外,在实践过程中我发现和并发消费模式还有很大的区别的。
第一,速度,下面我打算用实验来探究一下。
使用mq发送消息,消费者使用有序消费模式消费,具体的业务是阻塞100ms
Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
logger.info("==========CONSUME_START===========");
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
if(date1 == null)
date1 = new Date();//在第一次消费时初始化
Thread.sleep(100);
logger.info("total:"+(++total));
date2 = new Date();
totalTime = (date2.getTime() - date1.getTime());
logger.info("totalTime:"+totalTime);
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
消费100条消息
速度挺快的,为了让结果更准确,将消息加到1000条
消费1000条消息
可以看到每一条消息平均耗时25ms,然而业务是阻塞100ms,这说明有序消费模式和同步消费可能并不是一回事,那如果不阻塞代码我们再来看一下结果
不阻塞过后速度明显提高了,那么我阻塞300ms会怎么样呢?
时间相比阻塞100ms多了2倍
接下来我们测试并发消费模式
Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
if(date1 == null)
date1 = new Date();
Thread.sleep(100);
logger.info("total:"+(++total));
date2 = new Date();
totalTime = (date2.getTime() - date1.getTime());
logger.info("totalTime:"+totalTime);
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
基于上次的经验,同样测试三种情况,消费1000条不阻塞,消费1000条阻塞100ms,消费1000条阻塞300ms
消费1000条不阻塞的情况
和有序消费模式差不多,快个一两秒。
消费1000条阻塞100ms
竟然比不阻塞的情况更快,可能是误差把
消费1000条阻塞300ms
速度稍慢,但是还是比有序消费快得多。
结论是并发消费的消费速度要比有序消费更快。
另一个区别是消费失败时的处理不同,有序消费模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消费者会立马消费这条消息,而使用并发消费模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要过好几秒甚至十几秒才会再次消费。
我是在只有一条消息的情况下测试的。更重要的区别是,
返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不会增加消息的消费次数,mq消息有个默认最大消费次数16,消费次数到了以后,这条消息会进入死信队列,这个最大消费次数是可以在mqadmin中设置的。
mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3
我测试后发现,并发模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一个消息到达最大消费次数之后就不会再出现了。这说明有序消费模式可能并没有这个机制,这意味着你再有序消费模式下抛出固定异常,那么这条异常信息将会被永远消费,并且很可能会影响之后正常的消息。下面依然做个试验
Map<String, Integer> map = new HashMap<>();//保存消息错误消费次数
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
try {
if(1 == 1)
throw new Exception();
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
MessageExt msg = msgs.get(0);
if(map.containsKey(msg.getKeys())) {//消息每消费一次,加1
map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
}else {
map.put(msg.getKeys(), 1);
}
logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
发送了十条消息
可以看到虽然我发了十条消息,但是一直在消费同样四条消息,这可能跟消息broker有默认四条队列有关系。同时从时间可以看到,消费失败后,会马上拉这条信息。
至于并发消费模式则不会无限消费,而且消费失败后不会马上再消费。具体的就不尝试了。
结论是有序消费模式MessageListenerOrderly要慎重地处理异常,我则是用全局变量记录消息的错误消费次数,只要消费次数达到一定次数,那么就直接返回ConsumeOrderlyStatus.SUCCESS。
突然想到之前测试有序消费模式MessageListenerOrderly的时候为什么1000条消息阻塞100ms耗时25000ms了,因为有序消费模式是同时拉取四条队列消息的,这就对上了。
来源:https://blog.csdn.net/qq_36804701/article/details/81481343


猜你喜欢
- 1. 在原有工程目录右键-> new ->Module->:2. 选择library:3. 一路next,最后finish
- Annotation(注解)是JDK1.5及以后版本引入的。它可以用于创建文档,跟踪代码中的依赖性,甚至执行基本编译时检查。注解是以&
- 模式虽然精妙,却难完美,比如观察者模式中观察者生命周期的问题;比如访问者模式中循环依赖的问题等等;其它很多模式也存在这样那样的一些不足之处,
- 前言今天给大家带来一个新的控件–轮播图,网上已经有很多这类的博客来讲解如何实现的,那么我的这个有哪些特点呢?或是说有哪些不同呢?满足了轮播图
- using System;using System.Collections.Generic;using System.ComponentMo
- 目录引言编译环境及说明图片素材分割事件处理OnPaint事件鼠标交互事件代码汇总引言我们有时候会在程序的文件夹里看见一些图标,而这些图标恰好
- 需求:传入多个 id 查询用户信息,用下边两个 sql 实现:SELECT * FROM USERS WHERE username LIKE
- C++虚类相当于java中的抽象类,与接口的不同之处是:1.一个子类只能继承一个抽象类(虚类),但能实现多个接口2.一个抽象类可以有构造方法
- SpringBoot的自动装配是拆箱即用的基础,也是微服务化的前提。这次主要的议题是,来看看它是怎么样实现的,我们透过源代码来把握自动装配的
- 本文实例为大家分享了java读取excel文件的具体代码,供大家参考,具体内容如下方式一:借用package com.ij34.util;/
- FeignClient设置动态Url1. 需求描述一般情况下,微服务内部调用都是通过注册中心,eureka,zookeeper,nacos等
- 今天收到了Android Studio3.0更新推送,在升级过程中遇到几个问题,在这里把问题和解决方法记录下,方便要升级的童鞋。如果还有童
- logback自定义指定日志文件存储目录1、正常使用定义一个logback.xml配置文件即可:<?xml version="
- 介绍java中Pair在这篇文章中,我们讨论了一个非常有用的编程概念,配对(Pair)。配对提供了一种方便方式来处理简单的键值关联,当我们想
- 包括了写入和读取功能. 写入的时候, 如果文件不存在会自动创建. 如果对应的键已经存在, 则自动覆盖它的值. 读取的时候, 如果对应的文件不
- GradientTextViewGithub点我一个非常好用的库,使用kotlin实现,用于设置TexView的字体 渐变颜色、渐变方向 和
- 本文介绍了Java开发过程中日期相关操作,分享的代码如下:package jse;import java.io.UnsupportedEnc
- 1.微信配置信息 global.properties2.方法wxpay用于生成预支付订单信息方法notifyWeiXinPay用于微信支付成
- 假如是在同一台机器上开发,前后端分离的工程中出现跨域问题的原因是,前端工程和后端工程运行在不同的端口上。只要协议、域名、端口有一个不同就会产
- 短信链接跳转APP平时我们会收到广告短信,比如某东,某宝,里面附加着链接,当你点开链接(手机自带的浏览器),发现浏览器打开后,等一下下,就会