结合线程池实现apache kafka消费者组的误区及解决方法
作者:字母哥哥 发布时间:2023-08-06 15:40:31
标签:线程池,apache,kafka,消费者
一个错误:多线程使用单一消费者
下图显现了一种错误的使用KafkaConsumer的方法
创建多个线程用来消费kafka数据
多线程使用同一个KafkaConsumer对象
在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。
这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。
一个误区:多线程就是消费者组
下图中体现的是一种正常的KafkaConsumer使用方式
使用一个KafkaConsumer拉取数据
拉取数据后将一个批次的数据交给一个线程去处理
这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。
常规正确做法:使用线程池实现消费者组
下面的方法是常规的正确实现方式
因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
每个线程持有一个KafkaConsumer对象
多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
public void groupConsumer(){
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
MyConsumer myConsumer = new MyConsumer();
executorService.execute(myConsumer);
}
}
}
MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)
@Override
public void run() {
MyConsumer myConsumer = new MyConsumer();
myConsumer.pollData();
}
来源:https://blog.csdn.net/hanxiaotongtong/article/details/125646483
0
投稿
猜你喜欢
- 前言在java Thread类中,我们会看到interrupt()、interrupted()及isInterrupted(),在大多数情况
- 什么是volatile关键字volatile是Java中用于修饰变量的关键字,其可以保证该变量的可见性以及顺序性,但是无法保证原子性。更准确
- 背景后台系统需要接入 企业微信登入,满足企业员工快速登入系统流程图简单代码说明自定义一套 springsecurity 认证逻辑主要就是 根
- 如图所示的效果相信大家都不陌生,我们可以使用很多种方法去实现此效果,这里自己采用CountDownTimer定时器简单封装下此效果,方便我们
- 讲这个例子前,咱们先来看一个简单的程序:字符串数组实现数字转字母:#include <stdio.h>#include <
- 微服务治理Spring Cloud 工具套件为微服务治理提供了全面的技术支持。这些治理工具主要包括服务的注册与发现、负载均衡管理、动态路由、
- android线程消息机制主要由Handler,Looper,Message和MessageQuene四个部分组成。平常在开发中,我们常用来
- 自动登录是我们在软件开发时一个非常常见的功能,例如我们登录 QQ 邮箱:很多网站我们在登录的时候都会看到类似的选项,毕竟总让用户输入用户名密
- 多数据源创建数据库CREATE DATABASE mybatis_plus_1;USE mybatis_plus_1;CREATE TABL
- 前言Java8 由Oracle在2014年发布,是继Java5之后最具革命性的版本。Java8吸收其他语言的精髓带来了函数式编程,lambd
- 静态库和动态库的区别1、静态库的扩展名一般为".a"或者".lib";动态库的扩展名一般为"
- 一、介绍在实际的软件项目开发过程中,我可以很负责任的跟大家说,如果你真的实际写代码的时间超过5年,你对增删改查这类简单的功能需求开发,可以说
- 一、前言(吐槽+煽情+简介) &n
- 为什么要使用路由在之前我们的代码中,页面跳转使用的代码如下所示:Navigator.of(context).push( Mate
- 有时候,我们在同一个activity里面有很多fragment,在横竖屏的时候,有些fragment要求重新加载数据,有些不需要,如何简单控
- 示例 1 :使用搜索表单创建全屏模式我们要构建的小应用程序有一个应用程序栏,右侧有一个搜索按钮。按下此按钮时,将出现一个全屏模式对话框。它不
- 最近的项目中要实现一个聊天的功能,类似于斗鱼TV的聊天室功能,与服务器端人商量后决定用WebSocket来做,但是在这之前我只知道Socke
- 一,FileWritter写入文件FileWritter, 字符流写入字符到文件。默认情况下,它会使用新的内容取代所有现有的内容,然而,当指
- 最近“全网域(Web Scale)”一词被炒得火热,人们也正在通过扩展他们的应用程序架构来使他们的系统变得更加“全网域”。但是究竟什么是全网
- 自从SEOTcs系统11月份24日更新了一下SEO得分算法以来,一直困扰我的一个问题出现了,java的数据job任务,在执行过程中会经常报以