kafka监听问题的解决和剖析
作者:知晓汝名,吓吾一跳! 发布时间:2021-06-28 04:41:25
目录
问题如下:
一. 解决问题一(kafka监听不到数据)
二. 解决问题二(kafka为什么会有重复数据发送)
三. 解决问题三(kafka数据重复如何解决)
四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)
五. 粘一下我的监听配置文件
总结
问题如下:
kafka为什么监听不到数据
kafka为什么会有重复数据发送
kafka数据重复如何解决
为什么kafka会出现俩个消费端都可以消费问题
kafka监听配置文件
一. 解决问题一(kafka监听不到数据)
首先kafka监听不得到数据,检查如下
检查配置文件是否正确(可能会出现改了监听地址,监听Topic,监听的地址的数量问题)
检查接收数据的正确性(比如原生的代码,可能是用byte序列化接收的数据,而你接收使用String。也是配置文件序列化问题,还有与发送者商量问题)
检查kafka版本问题(一般的版本其实是没什么问题的,只有个别版本会出现监听不到问题)
没有加
@Component 犯了最不应该出差错的问题
如果出现监听不到数据的问题,那么就试试更改方法一二,如果不可以在去试试方法三,之前出现这个问题也是查过 一般查到都会说 “低版本的服务器接收不到高版本的生产者发送的消息”,但是净由测试使用 用1.0.5RELEASE 和 2.6.3反复测试,并没有任何的问题。
如果按照版本一致,那么根本就不现实,因为可能不同的项目,springboot版本不一致的话,可能有的springboot版本低,那么你还得要求自己维护项目版本升级?如果出现第四种情况就无话可说了。
二. 解决问题二(kafka为什么会有重复数据发送)
重复数据的发送问题如下
可能在发送者的那里的事务问题。mysql存储事务发生异常导致回滚操作,但是kafka消息却是已经发送到了服务器中。此事肯定会出现重复问题
生产者设置时间问题,生产发送设置的时间内,消息没完成发送,生产者以为消费者挂掉,便重新发送一个,导致重复
offset问题,当项目重启,offset走到某一个位置已扔到kafka服务器中,但是项目被重启.那么offset会是在原本重启的那一个点的地方再次发送一次,这是kafka设计的问题,防止出现丢失数据问题
三. 解决问题三(kafka数据重复如何解决)
目前我是使用的Redis进行的排重法,用的是Redis中的set,保证里面不存在重复,保证Redis里面不会存入太多的脏数据。并定期清理
粘贴一下我的排重(Redis排重法)
//kafka prefix
String cache = "kafka_cache";
//kafka suffix
Calendar c = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//0点,目前是为了设置为这一天的固定时间。这个完全可以去写个工具类自己弄,为了看的更清楚,麻烦了一点的写入
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
String gtimeStart = sdf2.format(c.getTime());
long time = sdf.parse(gtimeStart).getTime();
//此位置为了设置是否是新的一天,新的一天需要设置定时时间,保证redis中不会存储太多无用数据
Boolean flag = false;
//数据接收
Set<String> range = new HashSet<>();
//判断是否存在
if (redisTemplate.hasKey(cache + time)) {
//存在则取出这个set
range = redisTemplate.opsForSet().members(cache + time);
}else {
//不存在,则为下面过期时间的设置铺垫
flag = true;
}
//判断监听到的数据是否是重复
if (range.contains("测试需要")) {
//重复则排出,根据逻辑自己修改
continue;
} else {
//添加进去
redisTemplate.opsForSet().add(cache + time, i+"");
if (flag){
//设置为24小时,保证新一天使用,之前使用的存储会消失掉
redisTemplate.expire(cache + time,24,TimeUnit.HOURS);
//不会在进入这个里面,如果多次的存入过期时间,那么这个key的过期时间就永远是24小时,一直就不会过期
flag = false;
}
}
四. 解决问题四(为什么kafka会出现俩个消费端都可以消费问题)
原因是因为在不同groupId之下,kafka接收到以后,会给监听他的每一个组发送一个他所收到的消息,但是两个消费端监听同一个租,那么就只有一个消费端可以消费到。
五. 粘一下我的监听配置文件
# 指定kafka 代理地址,可以多个,用逗号间隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id= test
# 是否自动提交
spring.kafka.consumer.enable-auto-commit= true
# 提交间隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大轮询的次数
spring.kafka.consumer.max-poll-records=1
# 将偏移量重置为最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
来源:https://www.cnblogs.com/honour1207/p/14078121.html


猜你喜欢
- 知识点回顾封装封装(有时称为数据隐藏)是与对象有关的一个重要概念。从形式上来看,封装不过是将数据和行为组合在一个包中,并对对象的使用者隐藏了
- 适用场合: 7.3工厂模式的适用场合 创建新对象最简单的办法是使用new关键字和具体类。只有在某些场合下,创建和维护对象工厂所带来的额外复杂
- 上篇博客我们了解了请求参数的获取,那么获取到请求参数之后,需要对参数进行出来,然后进行数据响应。那么这篇博客我们就来了解 Controlle
- JetBrains 系列产品(IDEA、Pycharm 等)使用本站破解教程 (opens new window),在输入激活码时,部分小伙
- 文件比较平常都是用Beyond Compare,可以说离不开的神器,特别是针对代码比较这块,确实挺好用的。不过Beyond Compare平
- 前言map的迭代删除,和我们常见的list,set不太一样,不能直接获取Iteraotr对象,提供的删除方法也是单个的,根据key进行删除,
- 一、Jvm加载对象在说Java * 之前,还是要说一下Jvm加载对象的过程,这个依旧是理解 * 的基础性原理:Java类即源代码程序.j
- 这两天我写了一个测试c++异常处理机制的例子,感觉有很好的示范作用,在此贴出来,给c++异常处理的初学者入门。本文后附有c++异常的知识普及
- 一般数据库的编码是utf8,utf8是不支持存储表情符的,当存入的微信昵称带有表情符时就会出现乱码情况,有两种解决方法:1.mysql数据库
- C#动态webservice调用接口using System;using System.Collections;using System.I
- 一、创建web项目1、打开idea软件,点击界面上的Create New Project2、进入如下界面。选中 java Enterpris
- 本文实例讲述了C#禁用双击窗体图标关闭窗体的方法。分享给大家供大家参考。具体实现方法如下:[DllImport("user32.d
- 前言本章是关于Java数组的最全汇总,本篇为汇总中篇,主要讲了二维数组和不规则的数组的相关内容。数组是最常见的一种数据结构,它是相同类型的用
- 本文实例讲述了java实现递归文件列表的方法。分享给大家供大家参考。具体如下:FileListing.java如下:import java.
- 在学习Android开发的过程你,你往往会去借鉴别人的应用是怎么开发的,那些漂亮的动画和精致的布局可能会让你爱不释手,作为一个开发者,你可能
- 本文实例为大家分享了java实现简单猜数字的具体代码,供大家参考,具体内容如下代码不多说,图片自加,实现功能有数字比大小,菜单开始,帮助,退
- 本文实例讲述了C#创建windows系统用户的方法。分享给大家供大家参考。具体如下:下面的代码可以通过c#创建一个windows的本地系统账
- 一、JPA介绍JPA是Java Persistence API的简称,中文名Java持久层API,是JDK 5.0注解或XML描述对象-关系
- 本文实例讲述了C#及WPF获取本机所有字体和颜色的方法。分享给大家供大家参考。具体如下:WPF 获取所有的字体:System.Drawing
- 方法1: int d=10; d.ToString("x") //或把x改为X,,,就变成了16位的字符串了. int