kafka并发写大消息异常TimeoutException排查记录
作者:kl 发布时间:2023-11-27 23:07:11
前言
先简单介绍下我们的使用场景,线上5台Broker节点的kafka承接了所有binlog订阅的数据,用于Flink组件接收数据做数据中台的原始数据。昨儿开发反馈,线上的binlog大量报错,都是kafka的异常,而且都是同一条topic抛的错,特征也很明显,发送的消息体非常大,主观判断肯定是写入大消息导致的超时了,异常详情如下:
thread: kafka-producer-network-thread | producer-1
throwable: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for BIN-LOG-DATA-center-dmz2-TABLE-kk-data-center-ods_contract_finance_info-0 due to 56352 ms has passed since last append
定位异常点
应用抛一个不常见的异常,一般操作是先去百度or谷歌搜索一番的,就上面这个timeout超时的异常,搜索引擎的结果都是producer连不上Borker导致的问题,根本不是我们这个场景的,所以其次我们就需要从源码中寻找答案了。博主使用的开发工具是IDEA,借助IDEA很容易定位到异常抛出点。首先定位TimeoutException异常类,然后按住ctrl键,点击这个类,会出现如下图所有抛TimeoutException异常的点,然后根据异常message内容,寻找相匹配的点击进去就是抛异常的地方了,如图,红色箭头所指即代码位置:
分析抛异常的逻辑
程序中的异常,一定是符合某些条件才会抛出的,想要解决异常,只要让运行时的环境不满足抛异常的条件即可,下面就是抛异常的代码:
boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
boolean expired = expiryErrorMessage != null;
if (expired)
abortRecordAppends();
return expired;
}
可以看到,我们的异常是在第一个逻辑判断时候就满足了所以抛异常了。在此处有可能会抛出三个不同的timeout异常,用中文语义翻译条件分别是:
没设置重试,并且发送批次(batch.size)满了,并且配置请求超时时间(request.timeout.ms)小于【当前时间减去最后追加批次的时间】
没设置重试,并且配置请求超时时间(request.timeout.ms)小于【创建批次时间减去配置的等待发送的时间(linger.ms)】
设置重试,并且配置请求超时时间(request.timeout.ms)小于【当前时间-最后重试时间-重试需要等待的时间(retry.backoff.ms)】
上面括号中的参数就是kafka producer中配置的相关的参数,这些参数都没有重新设置过,batch.size默认是10kb大小,而引发报错的消息都是36kb的大小,默认的request.timeout.ms超时设置是30s,所以在这个判断可能过期了的方法中,引发我们异常的主要原因是batch.size和request.timeout.ms的参数设置问题了。
真实原因-解决方案
从上面代码看表面原因是参数设置不够了,实际上呢,博主使用kafka-test启动了五个Borker集群做复现验证测试,测试写入相同的36kb的message,在所有配置也保持默认的情况下,也根本毫无压力。后面查找相关的错误日志,发现所有的TimeoutException集中在几乎同一时刻,经查明,是因为业务批量导入了数据到mysql中,造成binlog消息突然增加,高并发的往kafka写大消息导致Borker处理不过来,造成的TimeoutException超时,所以真正解决问题也可以从两个方面入手:
服务端:增加Borker,并设置多个TopicPartition,平摊写入压力,这个是根本的解决问题
客户端:加大request.timeout.ms、batch.size参数,或者开启消息重试,这种方案治标不治本,但是也能大概率的减少因为此类场景导致的TimeoutException
来源:http://www.kailing.pub/article/index/arcid/259.html
猜你喜欢
- 本文主要给大家介绍了关于Java8中Optional类型和Kotlin中可空类型使用的相关内容,分享出来供大家参考学习,下面话不多说了,来一
- 注册中心呢 就是springcloud的一个核心组件 所有微服务的基石 微服务的核心思想就是分布式 所有的服务分开管理 但这些服务分开后该如
- 效果展示在实际项目当中我们经常看到如下各种剪裁形状的效果,Flutter 为我们提供了非常方便的 Widget 很轻松就可以实现,下面我们来
- 1、为什么使用缓存  我们知道内存的读取速度远大于硬盘的读取速度。当需要重复地获取相同数据时,一次一次地
- 字段策略 0:”忽略判断”,1:”非 NULL 判断”),2:”非空判断”问题描述:当字段策略为 0 “忽略判断” 的时候,如果实体和数据库
- 在Java中,可以通过Runtime类或ProcessBuilder类来实现调用外部程序。Runtime类与ProcessBuilder类使
- 前言这里主要简单介绍如何使用Camera+SurfaceView自定义相机拍照,如果是Camera2或者是TextureView的可以前往主
- Android 应用签名的两种方法一、使用pem签名 (一) apk签名命令java –jar sign
- 最近要做一个网站,要求实现验证码程序,经过不断调试,终于成功实现功能。一、验证码生成类生成验证码的话需要用到java的Graphics类库,
- 在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。
- 一、判断一个字符串str不为空的方法有:1、str == null;2、"".equals(str);3、str.len
- 一.HashMap 和Hashtable 的区别我们先看2个类的定义 public class Hashtable exten
- 这段时间用到了QT的TCP通信,做了初步的学习与尝试,编写了一个客户端和服务器基于窗口通信的小例程。使用QT的网络套接字需要.pro文件中加
- 服务降级服务压力剧增的时候,根据当前的业务情况及流量对一些服务和页面有策略的降级,以此缓解服务器的压力,以保证核心任务的进行。同时保证部分甚
- 路由做Android/iOS原生开发的时候,要打开一个新的页面,你得知道你的目标页面对象,然后初始化一个Intent或者ViewContro
- 前言在使用Java开发接口请求中,我们需要对请求进行进行统一返回值,这时候我们自己封装一个统一的Result返回类,下面就介绍下我用的这种的
- 本文实例讲述了Java单例模式下的MongoDB数据库操作工具类。分享给大家供大家参考,具体如下:我经常对MongoDB进行一些基础操作,将
- 对于因为编程错误而导致的异常,或者是不能期望程序捕获的
- JDK 1.5开始提供ScheduledThreadPoolExecutor类,ScheduledThreadPoolExecutor类继承
- 合成聚合复用原则合成复用原则又称为组合/聚合复用原则(Composition/Aggregate Reuse Principle, CARP