解决RabbitMq消息队列Qos Prefetch消息堵塞问题
作者:james_searcher 发布时间:2021-11-17 17:36:13
mq是实现代码扩展的有利手段,个人喜欢用概念来学习新知识,介绍堵塞问题的之前,先来段概念的学习。
ConnectionFactory
:创建connection的工厂类
Connection
: 简单理解为socket
Channel
:和mq交互的接口,定义queue、exchange和绑定queue、exhange等接口都是它。
接下来就是和mq的交互类
exchange
:简单地看成路由,类型不是重点,看看官网即可
queue
:客户端监听的是queue,而不是exchange,但是使用queue的前提要先将exchange和queue绑定。用过java queue工具类应该很容易上手,queue分为写和读,各自可以有自己频率,写得快读得慢,容易堵塞;写得慢读得快又容易造成消费者的空闲。
Prefetc
:一个重要却容易被忽略的指标,也是这次遇到的问题。
prefetch与消息投递
prefetch是指单一消费者最多能消费的unacked messages数目。
如何理解呢?
mq为每一个 consumer设置一个缓冲区,大小就是prefetch。每次收到一条消息,MQ会把消息推送到缓存区中,然后再推送给客户端。当收到一个ack消息时(consumer 发出baseack指令),mq会从缓冲区中空出一个位置,然后加入新的消息。但是这时候如果缓冲区是满的,MQ将进入堵塞状态。
更具体点描述,假设prefetch值设为10,共有两个consumer。也就是说每个consumer每次会从queue中预抓取 10 条消息到本地缓存着等待消费。同时该channel的unacked数变为20。而Rabbit投递的顺序是,先为consumer1投递满10个message,再往consumer2投递10个message。如果这时有新message需要投递,先判断channel的unacked数是否等于20,如果是则不会将消息投递到consumer中,message继续呆在queue中。之后其中consumer对一条消息进行ack,unacked此时等于19,Rabbit就判断哪个consumer的unacked少于10,就投递到哪个consumer中。
我遇到的问题是一个粗心的程序员,在编写代码的时候,他对某些消息处理方式是这样的
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
logger.error("######### The message is not delete from queue : {}", body);
}
首先他讲ack机制设置为手动的,然后他的理解是如果处理成功的消息,就ack给MQ,期望MQ就可以删除完成的数据。不然,保留数据再次被处理。
这里的误区就是就是对ack的理解,失败的时候,如果需要让程序继续处理,应该使用basicNack,并告诉mq将消息再次放入队列
if (success) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
对于客户端意外宕机的情况,没有ack服务器确实不会删除掉数据,但是consumer重启以后,对于服务器就是一个新的消费者了,也就是它的缓冲区又被重置为原来的n-prefetch,所以这个问题被粗心的小哥想当然地测试通过了。
prefetch的大小应该为多少
这篇文章给了很好的建议,我简单地说一下我的理解。
理想状况下,计算MQ SERVER 从缓冲区中拿到消息并推送到消费端,加上消费端处理完ack消息到MQ server,的时间,假设为100ms,其中消费端处理业务话费了10ms。
这里可以得出我们 prefetch = 100ms / 10ms = 10,也就是消息来回的总时间/业务处理的时间,这里要求我们 prefetch >= 10。一般计算这个时间不会太准确只能毛姑姑的,所以prefetch一般要大一点。但是这个值也不能太大,不然消费端就一只处于空闲状态了。
所以如果你保证所有的消息都ack了,但是还是出现比较长时间的堵塞,你就或者加大一点prefetch,或者多加一些机器,或者减少业务处理的时间了。一开始建议采用或者,使用一个线程池来处理这些业务逻辑。
来源:https://blog.csdn.net/james_searcher/article/details/70308565


猜你喜欢
- 手机二维码扫码登录已经成为了现代互联网时代的一种普遍的登录方式。它的出现,极大地方便了用户登录的流程,减少了用户输入用户名和密码的麻烦。在二
- Java jar打包成exe应用程序,可在无JDK/JRE环境下运行老师让做一个小项目,但是需要打包发布出来,因此在网上查了很多资料之后总结
- 今天想和小伙伴们来聊一聊 Spring Security 中的角色继承问题。角色继承实际上是一个很常见的需求,因为大部分公司治理可能都是金字
- 概念在Java中,对象的生命周期包括以下几个阶段:创建阶段(Created)应用阶段(In Use)不可见阶段(Invisible)不可达阶
- 本文实例为大家分享了Android实现歌词滚动效果的具体代码,供大家参考,具体内容如下自定义TextViewpublic class Ver
- 前言想必大家对c语言的动态内存分配并不陌生,忘了的小伙伴也可以看看我的这篇文章C语言动态内存分配c语言的动态内存分配由于有些地方用起来比较麻
- 本文实例讲述了C#生成单页静态页简单实现方法。分享给大家供大家参考。具体方法如下:protected void BtGroup_Server
- 这里介绍两种方法,一种是Spannable,一种是Html.fromHtml(通过html标签来改变),实际中看您使用哪种方便选择使用即可1
- 一、Java后端使用MultipartFile@PostMapping(value = "/upload")  
- 本文实例讲述了Android基本游戏循环。分享给大家供大家参考。具体如下:// desired fpsprivate final stati
- 前言OkHttp是目前非常火的网络库,支持HTTP/2,允许所有同一个主机地址的请求共享同一个socket连接,连接池减少请求延时,透明的G
- 一、Jsoup的主要功能Jsoup是一款Java 的HTML解析器,可直接解析某个URL地址、HTML文本内容。它提供了一套非常省力的API
- 一、java内存区域Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。这些区域都有各自的用途,以及创建和
- 1.object.toString()方法这种方法要注意的是object不能为null,否则会报NullPointException,一般别
- 这篇文章主要介绍了MyBatis Mapper接受参数的四种方式代码解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考
- jedis是redis的java客户端,spring将redis连接池作为一个bean配置。redis连接池分为两种,一种是“redis.c
- 前言众所周知,在多个项目中可能会相同的模块,如果每个项目都去创建一遍的话,这样开发效率会很低。比如在开发一个APP应用的时候,有供APP使用
- 用java来打包文件生成压缩文件,有两个地方会出现乱码:内容的中文乱码问题:修改sun的源码。使用开源的类库org.apache.tools
- 最近有人问我如何实现倒计时的按钮功能,例如发送验证码,我记得有个CountDownTimer,因为好久没用过了,自己就写了一个,代码如下 n
- 概述在学习Spring的时候,在了解基本用法的时候,如果有时间一定要深入源码了解Spring的底层原理,这样在做一些适配工作、写一些轮子的时