流式图表拒绝增删改查之kafka核心消费逻辑上篇
作者:在下uptown 发布时间:2023-04-19 03:32:11
标签:kafka,流式图表,拒绝增删改查
消费逻辑
上文 流式图表框架搭建
框架搭建好之后着手开发下kafka的核心消费逻辑,流式图表的核心消费逻辑就是实现一个消费链接池维护消费者客户端链接,将kafka client封装成Runable任务提交到线程池里做一个常驻线程,实时消费数据,消费到数据后存到redis中,并通过websocket推送到浏览器,浏览器刷新图表实现流式图表功能。
代码设计
按照之前的代码划分,核心逻辑写在matrix-core子模块中,整体结构用maven的父子模块依赖继承的特性管理依赖。
maxtrix-core模块只做kafka client的管理和消费逻辑,尽量轻一点,只需要引入redis和kafka依赖即可。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.uptown</groupId>
<artifactId>matrix-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
反序列化工具、线程池工具、lombok都放到matrix-common中,具体用google的包,这样其他内部模块直接引用common模块即可使用。
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
消费池
首先要创建出一个线程池出来,由于我们的业务要实时监听数据,所以线程池提交的线程必须是个常驻线程。所以需要重写线程池的任务失败策略和异常处理器。
// 自定义异常处理器,捕获错误日志
@Slf4j
public class ConsumerExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error(e.getMessage(), e);
}
}
// 任务失败策略
@Slf4j
class ConsumerThreadPoolExecutor extends ThreadPoolExecutor {
ConsumerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler rejectedExecutionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, rejectedExecutionHandler);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//若线程执行某任务失败了,重新提交该任务
if (t != null) {
log.error("restart kafka consumer task for {}", (Object) t.getStackTrace());
}
execute(r);
}
}
剩下的创建出线程池即可,消费逻辑中只需要注入到具体类中即可。
@Data
@Component
@Slf4j
public class KafkaConsumerConfig {
// 线程池维护线程的最少数量
@Value(value = "${kafka.core-pool-size:20}")
private int corePoolSize;
// 线程池维护线程的最大数量
@Value(value = "${kafka.max-pool-size:20}")
private int maxPoolSize;
// 线程池维护线程所允许的空闲时间
@Value(value = "${kafka.keep-alive-time:0}")
private int keepAliveTime;
// 线程池所使用的缓冲队列大小
@Value(value = "${kafka.work-queue-size:0}")
private int workQueueSize;
// 统一存放kafka客户端的map
@Bean
public Map<String, KafkaConsumerRunnable> globalKafkaConsumerThreadMap() {
return Maps.newConcurrentMap();
}
/**
* kafka监听任务 线程池
*/
@Bean(name = "defaultThreadPool")
public ThreadPoolExecutor defaultThreadPool() {
// 使用google线程工厂 线程挂掉重启策略
ConsumerExceptionHandler exceptionHandler = new ConsumerExceptionHandler();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d")
.setUncaughtExceptionHandler(exceptionHandler).build();
return new ConsumerThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(maxPoolSize),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
这么搞的主要原因是防止消费线程中出现消费异常,比如反序列化异常、客户端监听网络异常等,为啥不在任务中try catch住异常是因为这样做更优雅点,让kafka client和线程的生命绑定一块,比较好管理。
统一存放kafka客户端的map算是做一个统计,统计内存中已提交的kafka监听线程数,具体的Runable任务放在下一篇提供,毕竟上班写文章容易翻车。
来源:https://juejin.cn/post/7220235452291416123


猜你喜欢
- 本文基于jdk1.8进行分析。LinkedList和ArrayList都是常用的java集合。ArrayList是数组,Linkedlist
- 在实现下拉框的基础上进行二级联动(这个项目有bug添加可以完成,但是修改获取不到对应的值,这个问题解决以后我会在发布一篇文章)JS部分//二
- 本文实例讲述了C#实现的三种模拟自动登录和提交POST信息的方法。分享给大家供大家参考,具体如下:网页自动登录(提交Post内容)的用途很多
- 一、引言在许多编程语言中,都有函数回调这一概念。C 和 C++ 中有函数指针,因此可以将函数作为参数传给其它函数,以便过后调用。而在 Jav
- 小米系统自带的长截屏应该很多人都用过,效果不错。当长截屏时listview就会自动滚动,当按下停止截屏时,就会得到一张完整的截屏。该篇就介绍
- 自用项目中统一Eclipse格式化Java、JavaScript、JSP、HTML代码设置1.Window->Preferences
- 前言LocalDateTime、LocalDate、LocalTime 是 Java8 全新的日期框架,加强了对时间的管理,有很多特别好用的
- 有时候一些项目并不需要提供 Web 服务,例如跑定时任务的项目,如果都按照 Web 项目启动未免画蛇添足浪费资源为了达到非 Web 运行的效
- 本文导读中秋节是中国民间的传统节日,中秋节源自天象崇拜由上古时代秋夕祭月演变而来。中秋节自古便有祭月、赏月、吃月饼等民俗,流传至今,经久不息
- 1. Ajax 概述Ajax 的英文全称是 ”Asynchronous JavaScript and XML&l
- C# 匿名函数、lambda表达式、Linq查询一、匿名函数的使用匿名函数是一个“内联”语句或表达式
- Java设计模式访问者模式模式概念访问者模式表示一个作用于某对象结构中的各元素的操作,它使你可以在不改变各元素类的前提下定义作用于这些元素的
- 前言最近需求中,需要实现 卫星菜单的需求,最终通过自定义View和动画属性来实现,具体功能如下:1.自定义Viewimport androi
- 前言本章是关于Java数组的最全汇总,本篇为汇总上篇,主要讲了一维数组的相关内容。数组是最常见的一种数据结构,它是相同类型的用一个标识符封装
- springboottest测试依赖和使用<dependency> <groupId>or
- 前言SSL Socket通讯是对socket的扩展,增加Socket通讯的数据安全性,SSL认证分为单向和双向认证。单向认证只认证服务器端的
- 本文实例讲述了C#及WPF获取本机所有字体和颜色的方法。分享给大家供大家参考。具体如下:WPF 获取所有的字体:System.Drawing
- C# 结构体在 C# 中,结构体是值类型数据结构。它使得一个单一变量可以存储各种数据类型的相关数据。struct 关键字用于创建结构体。定义
- java多线程-同步块Java 同步块(synchronized block)用来标记方法或者代码块是同步的。Java 同步块用来避免竞争。
- 1. 安装JDK解释: JDK是Java编写环境--开发环境注: 安装路径不可出现中文及标点符号。比如:D:\Java\jdk81.1 下载