结合线程池实现apache kafka消费者组的误区及解决方法
作者:字母哥哥 发布时间:2023-08-06 15:40:31
标签:线程池,apache,kafka,消费者
一个错误:多线程使用单一消费者
下图显现了一种错误的使用KafkaConsumer的方法
创建多个线程用来消费kafka数据
多线程使用同一个KafkaConsumer对象
在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。
这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。
一个误区:多线程就是消费者组
下图中体现的是一种正常的KafkaConsumer使用方式
使用一个KafkaConsumer拉取数据
拉取数据后将一个批次的数据交给一个线程去处理
这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。
常规正确做法:使用线程池实现消费者组
下面的方法是常规的正确实现方式
因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
每个线程持有一个KafkaConsumer对象
多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
public void groupConsumer(){
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
MyConsumer myConsumer = new MyConsumer();
executorService.execute(myConsumer);
}
}
}
MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)
@Override
public void run() {
MyConsumer myConsumer = new MyConsumer();
myConsumer.pollData();
}
来源:https://blog.csdn.net/hanxiaotongtong/article/details/125646483


猜你喜欢
- Java 散列存储Java中散列存储的数据结构主要是指HashSet、HashMap、LinkedHashSet、LinkedHashMap
- 在ios手机上经常看到页面上下滑动回弹效果,安卓中没有原生控件支持,这里自己就去自定义一个scrollview实现回弹效果1. 新建MySc
- 1.统计字符串字母个数(并且保持字母顺序)比如: aabbbbbbbba喔喔bcab cdabc deaaa目前我做知道的有5种方式噢,如果
- 项目需要对接外部接口,将图片文件流发送到外部接口,下面代码就是HttpsURLConnection如何上传文件流:/** *
- 什么是OKHttp一般在Java平台上,我们会使用Apache HttpClient作为Http客户端,用于发送 HTTP 请求,并对响应进
- 在实际编程中,往往存在着这样的“数据集”,它们的数值在程序中是稳定的,而且“数据集”中的元素是有限的。例如星期一到星期日七个数据元素组成了一
- 前言上两篇文章对安卓自定义view的事件分发做了一些应用,但是对于自定义view来讲,并不仅仅是事件分发这么简单,还有一个很重要的内容就是v
- 1.项目介绍这是一款基于 Java 开发的移动端安卓小游戏——大家来拼图2.项目原理把选定的一张图片
- 抽象类1.引出抽象类向上转型带来的最大的好处就是参数统一化,使用共同的父类引用,就可以接收所有的子类实例。多态非常依赖方法覆写,但是子类可以
- 本文实例讲述了C#图像处理之边缘检测(Smoothed)的方法。分享给大家供大家参考。具体如下://定义smoothed算子边缘检测函数pr
- 直接调用HashKit.sha1(String str)方法就可以了,,返回的是16进制的字符串长度是40,也就是用md.digest()方
- 1.定义字符串字符串常见的构造方式如下:String s1 = "with";String s2 = new Strin
- 一、简介在现阶段的Android开发中,注解越来越流行起来,比如ButterKnife,Retrofit,Dragger,EventBus等
- 本篇主要讲解SpringBoot当中使用Servlet三大组件,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学
- 使用 try/catch 处理异常try-catch 块的用途是捕捉和处理工作代码所生成的异常。 有些异常可以在 catch 块中处理,解决
- 本文实例讲述了C#实现的WINDOWS登录功能。分享给大家供大家参考,具体如下:using System;using System.Data
- 本文实例为大家分享了OpenGL实现多段Bezier曲线拼接的具体代码,供大家参考,具体内容如下运行程序的交互方式有点类似corelDraw
- 1.Jquery验证1)引入头文件<script src="../../Scripts/jquery-1.7.1.js&qu
- 一、引言Good Good Study,Day Day Up,童鞋点个关注,不迷路,么么哒~~~MP自带的条件构造器虽然很强大,有时候也避免
- 概述基于java + swing + JFrame 的图书馆管理系统,租车,还车,管理员管理用户,付款等。部分代码public class