消息队列-kafka消费异常问题
作者:秃头披风侠_ 发布时间:2023-02-02 07:08:05
标签:kafka,队列,异常
概述
在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了。比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费。下面笔者给出解决方案。
重试一定次数(消息丢失)
@KafkaHandler
@KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
public void test6(String msg){
businessProcess(msg);
}
private void businessProcess(String msg){
System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
if (Integer.valueOf(msg) % 5 == 0) {
int i = 1 / 0;
}
}
说明:如果读者使用的是java客户端,也就是spring进行实现,那么在不做任何处理的情况下,会自动重试10次,然后消息会被直接处理掉。也就是说如果你的业务允许消息丢失,那么你不需要额外的编码处理
加入到死讯队列(消息不丢失)
消费端代码:
//1.启用手动提交offset
//2.配置errorHandler,用来加入到死讯队列
//3.不管业务处理是否处理异常还是正常都提交offset
@KafkaHandler
@KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
public void test6(String msg,Acknowledgment ack){
try {
businessProcess(msg);
}finally {
//手动提交
ack.acknowledge();
}
}
//1.专门处理死讯队列消息,都是topicName+.DLT的主题
//2.死讯队列里,只有消费成功的才提交offset,否则等待bug修复完上线,继续处理
@KafkaHandler
@KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
public void test7(String msg,Acknowledgment ack){
try {
businessProcess(msg);
ack.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
//业务代码
private void businessProcess(String msg){
System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
if (Integer.valueOf(msg) % 5 == 0) {
int i = 1 / 0;
}
}
异常处理器
//1.向容器注册一个KafkaListenerErrorHandler类型的bean
//2.该bean就是当处理消息异常的时候,将消息加入到.DLT主题中
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_DLT=".DLT";
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
System.out.println("消费失败消息:"+message.toString());
//获取消息处理异常主题
MessageHeaders headers = message.getHeaders();
String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
//放入死讯队列
kafkaTemplate.send(topic,message.getPayload());
return message;
}
}
效果图:
说明:以上基本上就是使用死讯队列的方案,也许读者会觉得这样编码复杂度很高,但其实不用担心,其实上面这些代码基本上是使用死讯队列的模板代码,在成熟一点的公司,一般会使用上述代码进行简单封装,这里笔者给个思路,有兴趣同学可以实现一下。我们其实可以使用aop思想,进行自定义一个@EnableDLT这样的注解去实现,这样上面这个方案使用起来是不是就简单优雅了。之前笔者在开发过程中使用过亚马逊的消息队列服务,也不过是这样实现罢了。
来源:https://blog.csdn.net/m0_58554082/article/details/117927021


猜你喜欢
- 本文实例为大家分享了Android获取设备传感器的具体代码,供大家参考,具体内容如下结果示例:xml代码:<?xml version=
- 刚学完JDBC不久,做了一个简单的学生管理系统,可能还有不完善的地方,望各路大神见谅。废话不多说,我先贴个图让大家让大家瞅瞅,觉得是你想要的
- 什么是AOPAOP是 Aspect Oriented Programming 的缩写,即面向切面编程,和平常遇到的面向对象OOP编程不一样的
- 这是一个可以从乱码文本中得到正确的原始文本的程序,其基于的原理在于错误的编码往往导致位补充,因此正确的文本使用的字节数应该是最少的(之一)。
- Spring MVC Controller控制器,是MVC中的部分C,为什么是部分呢?因为此处的控制器主要负责功能处理部分:收集、验证请求参
- 前言AOP(Aspect Oriented Programming),即面向切面编程,是Spring框架的大杀器之一。首先,我声明下,我不是
- 概述现实生活中,我们常会看到这样的一种集合:IP地址与主机名,身份证号与个人,系统用户名与系统用户对象等,这种一一对应的关系,就叫做映射。J
- 调用微信接口前需要准备的内容。1.微信公众平台的appid2.微信公众平台的secret3..获取tokenid4.获取ticket5.生成
- Kotlin中StateFlow的使用StateFlow 是 Flow 的实现,是一个特殊的流,默认的 Flow 是冷流,而StateFlo
- 1.前言热修复一直是这几年来很热门的话题,主流方案大致有两种,一种是微信Tinker的dex文件替换,另一种是阿里的Native层的方法替换
- C#控制台程序使用Log4net日志组件,供大家参考,具体内容如下1、Log4net一般都不陌生,但是在配置上不同类型的项目又不相同的地方比
- 前言本文主要讲述如何在同一个窗体内,实现不同功能模块的页面切换。一、准备工作1.搭建一个简单的mvvm项目结构首先搭建一个简单的项目框架,然
- 实习一段时间了,一直想写点技术总结,但一直没找到合适的主题。刚好,最近版本中我负责的模块遇到了个线程相关问题(之前一直画界面,做点基础功能,
- DBUtils工具包一.介绍DBUtils是Apache组织开源的数据库工具类。二.使用步骤①.创建QueryRunner对象②.调用upd
- 一.前言在以前的项目中,很少去关注spring aop的具体实现与理论,只是简单了解了一下什么是aop具体怎么用,看到了一篇博文写得还不错,
- 1 场景调用多个平级服务,按照服务优先级返回第一个有效数据。具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个
- 一、JSON格式介绍JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。相对于另一种数据交换格式
- 说明这里只以 servlet 为例,没有涉及到框架,但其实路径的基本原理和框架的关系不大,所以学了框架的同学如果对路径有疑惑的也可以阅读此文
- 1. 概述官方JavaDocsApi: javax.swing.JComboBoxJComboBox,下拉列表框。JComboBox以下列列
- 最近正在学习使用Android Studio,发现默认的Hello World程序界面和我们