Spring Boot MQTT Too many publishes in progress错误的解决方案
作者:??剑圣无痕???? 发布时间:2023-06-27 02:16:44
前言
最近项目中需要与andorid端进行交互,采用了MQTT消息进行通信,生产环境中偶尔会出现Too many publishesin progress(32202)的错误,严重的影响了正常功能的使用。
项目中采用的是Spring Boot2.0集成的MQTT引入的版本为1.2.0,消息发送用的是MessagingGateway的方式实现,不熟悉的朋友可以查看这篇文章Spring boot 集成 MQTT详情
原因分析
出现此问题的原因跟MQTT的Qos的设置有关,所以需要简单的介绍下Qos相关值的含义
0 最多一次的传输
发布者发送消息到服务器,没有确认消息,也不知道对方是否收到。
1 至少一次的传输
发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)发送消息到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅会产生重复消息.
2 只有一次的传输
Qos为2只是在1的基础上做了改进,在发布者发送到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器发送到订阅者之后也多了消息的确认。
项目中使用了MQTT发送消息的地方比较多,且一般都是以Qos为0,那么为什么Qos为0,在并发量比较大的情况下就会出现Too many publishesin progress(32202)的错误,报错的内容的源码如下:
当actualInFlight超出设置的maxInflight最大值时就会报此错误,那么具体是什么原因导致的呢?我们需要通过源码来分析此问题的原因。
源码分析
关于源码的阅读我们需要整理主线思路,MQTT发送消息主线分为消息push到缓存中和异步发送两部分。
MQTT的Push消息到缓存中时序图
MqttPahoMessageHandler的publish方法
说明: checkConection检查连接后,在发送消息。
MqttAsyncClient的publish方法
ClientComms的internalSend方法
ClientState的send方法
MqttPublish消息类型,继承了父类MqttWireMessage,而在MqttWireMessage的构造方法中将消息id设置为0
SaveToken的源码实现如下:
通过前面这几步的操作,消息已经放入到HashTable缓存中,准备异步发送。
异步发送消息时序图
说明:MqttAsyncClient的connect为客户端建立连接,兴趣的可以看下源码。
ClientComms的conncect方法
ConnectBG的run方法
CommsSender的run方法
1.从clientState中获取消息 2.通过消息id去hashtable中获取缓存消息 3.消息不为空,执行消息发送 4.调用notifySent方法删除消息,且actualInFlight执行递减操作。
CommsSender的notifySent方法
小结
在高并发的场景下,pendingMessage可能会添加多条数据,Qos设置为0的时候,存入tokens(Hashtable)中的key一直是0,当执行tokenStore.getToken在发送方法之后会remove所有数据,由于tokenStore中已经不存在值,因为已经被上一次已经全部remove了,当再次getToken的消息时获取会为空,不在发送信息,使得actualInFlight没有递减,所以才经过一段时间后actualInFlight就会超出设置最大值,从而报错。
//存放待发送消息的Vector数组
volatile private Vector pendingMessages;
解决方案
方案1:发送消息时设置为Qos=1
此方案虽然可以解决此问题,但存在如下的缺点:
网络延迟时会发送重复消息问题,导致消费者重复消费,关于重复的消息解决需要进行相关的幂等性操作,增加了修改的复杂度和成本。
发送消息需要进行消息确认,网络资源消耗过大。
方案2: 修改maxInflight的默认值,例如将其修改为50
```
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setMaxInflight(50);
```
此方案虽然修改比较简单,但是并没有从根本上解决问题,只是缓解了出现错误的时间,如果项目中并发量比较低,可以采用此方案解决。
方案3:将消息配置为多客户端模式
由于mqttMessageHandler只会引用一个paho客户端,所以才会想到增加客户端模式来提高并发量.需要重写MqttPahoMessageHandler类的相关方法。虽然可以解决此问题,如果对MQTT的源码不是很了解,不建议采用此方案,不利于后续的版本升级。
方案4:升级mqtt版本为1.2.1
在1.2.1的版本中官方已经进行了相关的修改,当qos=0已经不存入tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题。
引入1.2.1的版本会带来https验证问题,因为在Mqtt的1.2.1版本中,增加了https的验证需要添加相关配置,否则启动时会报安全认证错误。
解决方案:如果项目中没有开启https认证,需要设置HttpsHostnameVerificationEnabled为false即可。
mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);
来源:https://juejin.cn/post/7082033911986913317
猜你喜欢
- 懒加载---就是我们在spring容器启动的是先不把所有的bean都加载到spring的容器中去,而是在当需要用的时候,才把这个对象实例化到
- 本文实例为大家分享了Java实现FTP上传与下载的具体代码,供大家参考,具体内容如下JAVA操作FTP服务器,只需要创建一个FTPClien
- 目录一 为什么要用锁二 synchronized怎么实现的三 CAS来者何人四synchronized和CAS孰优孰劣轻量级锁重量级锁总结提
- 1. 网络爬虫是一个自动提取网页的程序,它为搜索引擎从万维网上下载网页,是搜索引擎的重要组成。传统爬虫从一个或若干初始网页的URL开始,获得
- 废话不多说了,直接给大家贴关键代码了。具体代码如下所示:using System;using System.Collections.Gene
- Android 微信摇一摇功能实现,最近学习传感器,就想实现摇一摇的功能,上网查了些资料,就整理下。如有错误,还请指正。开发环境Androi
- 当键盘敲下后退键(Backspace)后1、禁止浏览器自动后退2、但不影响密码、单行文本、多行文本输入框等的回退操作<script t
- java 交换两个数据的方法1:利用数组,即先把要交换的数字放在数组中 ,比如在一些数组排序中可能用到public static void
- 下面Demo中我使用了2种排序方式1.让Employee继承IComparable 接口,实现CompareTo方法排序2.定义一个方法使用
- 前言之前几篇我们介绍了贝塞尔曲线的原理、绘制曲线和动效实现,这些都是代码预设好的,如果我们要根据需要自行绘制曲线,就需要使用交互来实现了。本
- 一、前言:垃圾回收:在未来的JDK中可能G1会为ZGC所取代先问自己几个问题:什么是垃圾?垃圾就是堆内存中(范指)没有任何指针指向的对象实体
- 一 关键pom<dependencies> <dependency> <groupId>or
- 一、synchronized 有不足新事物的出现要不是替代老事物,要么就是对老事物的补充JUC 的 locks 就是对 synchroniz
- 前言提问:springboot项目,开发环境、测试环境和生产环境配置文件如何分开表示?答:多profile文件方式1、多环境配置(profi
- 如果有人对程序的崩溃原因做下统计的话,那么由于对象为空,但是又访问了对象的某个属性而导致的崩溃,也许会是程序崩溃的第一大原因了。比如我们在使
- 给定一个排序链表,删除所有重复的元素,使得每个元素只出现一次。示例 1:输入: 1->1->2输出: 1->2示例 2:输
- 前言2017 Google IO 大会,宣布将支持Kotlin作为开发语言。自此Kotlin成为了Android开发中的又一官方支持语言,当
- IM SDK API 概述 https://cloud.tencent.com/document/product/269/33543///
- 1.什么是servlet?sun(oracle)公司制订的一种用来扩展web服务器端功能的组件规范。背景:常用的web服务器:apache
- Interceptor讲到Interceptor,相信熟悉struts2的童鞋肯定不会陌生了,struts2可以自定义 * 进行自己想要的一