java动态线程池的简单实现思路
作者:耶耶耶耶yeyeye 发布时间:2023-10-18 15:53:46
什么是动态线程池?
在线程池日常实践中我们常常会遇到以下问题:
代码中创建了一个线程池却不知道核心参数设置多少比较合适。
参数设置好后,上线发现需要调整,改代码重启服务非常麻烦。
线程池相对于开发人员来说是个黑箱,运行情况在出现问题 前很难被感知。
因此,动态可监控线程池一种针对以上痛点开发的线程池管理工具。主要可实现功能有:提供对 Spring 应用内线程池实例的全局管控、应用运行时动态变更线程池参数以及线程池数据采集和监控阈值报警。
已经实现的优秀开源动态线程池
hippo4j、dynamic-tp.....
实现思路
核心管理类
需要能实现对线程池的
服务注册
获取已经注册好的线程池
以及对注册号线程池参数的刷新。
对于每一个线程池,我们使用一个线程池名字作为标识每个线程池的唯一ID。
伪代码实现
public class DtpRegistry {
/**
* 储存线程池
*/
private static final Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<>();
/**
* 获取线程池
* @param executorName 线程池名字
*/
public static Executor getExecutor(String executorName) {
return EXECUTOR_MAP.get(executorName);
}
/**
* 线程池注册
* @param executorName 线程池名字
*/
public static void registry(String executorName, Executor executor) {
//注册
EXECUTOR_MAP.put(executorName, executorWrapper);
}
/**
* 刷新线程池参数
* @param executorName 线程池名字
* @param properties 线程池参数
*/
public static void refresh(String executorName, ThreadPoolProperties properties) {
Executor executor = EXECUTOR_MAP.get(executorName)
//刷新参数
//.......
}
}
如何创建线程池?
STEP 1. 我们可以使用yml配置文件的方式配置一个线程池,将线程池实例的创建交由Spring容器。
相关配置
public class DtpProperties {
private List<ThreadPoolProperties> executors;
}
public class ThreadPoolProperties {
/**
* 标识每个线程池的唯一名字
*/
private String poolName;
private String poolType = "common";
/**
* 是否为守护线程
*/
private boolean isDaemon = false;
/**
* 以下都是核心参数
*/
private int corePoolSize = 1;
private int maximumPoolSize = 1;
private long keepAliveTime;
private TimeUnit timeUnit = TimeUnit.SECONDS;
private String queueType = "arrayBlockingQueue";
private int queueSize = 5;
private String threadFactoryPrefix = "-td-";
private String RejectedExecutionHandler;
}
yml example:
spring:
dtp:
executors:
# 线程池1
- poolName: dtpExecutor1
corePoolSize: 5
maximumPoolSize: 10
# 线程池2
- poolName: dtpExecutor2
corePoolSize: 2
maximumPoolSize: 15
STEP 2 根据配置信息添加线程池的BeanDefinition
关键类
@Slf4j
public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private Environment environment;
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
log.info("注册");
//绑定资源
DtpProperties dtpProperties = new DtpProperties();
ResourceBundlerUtil.bind(environment, dtpProperties);
List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
if (Objects.isNull(executors)) {
log.info("未检测本地到配置文件线程池");
return;
}
//注册beanDefinition
executors.forEach((executorProp) -> {
BeanUtil.registerIfAbsent(registry, executorProp);
});
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
}
/**
*
* 工具类
*
*/
public class BeanUtil{
public static void registerIfAbsent(BeanDefinitionRegistry registry, ThreadPoolProperties executorProp) {
register(registry, executorProp.getPoolName(), executorProp);
}
public static void register(BeanDefinitionRegistry registry, String beanName, ThreadPoolProperties executorProp) {
Class<? extends Executor> executorType = ExecutorType.getClazz(executorProp.getPoolType());
Object[] args = assembleArgs(executorProp);
register(registry, beanName, executorType, args);
}
public static void register(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Object[] args) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
for (Object arg : args) {
builder.addConstructorArgValue(arg);
}
registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
}
private static Object[] assembleArgs(ThreadPoolProperties executorProp) {
return new Object[]{
executorProp.getCorePoolSize(),
executorProp.getMaximumPoolSize(),
executorProp.getKeepAliveTime(),
executorProp.getTimeUnit(),
QueueType.getInstance(executorProp.getQueueType(), executorProp.getQueueSize()),
new NamedThreadFactory(
executorProp.getPoolName() + executorProp.getThreadFactoryPrefix(),
executorProp.isDaemon()
),
//先默认不做设置
RejectPolicy.ABORT.getValue()
};
}
}
下面解释一下这个类的作用,environment实例中储存着spring启动时解析的yml配置,所以我们spring提供的Binder将配置绑定到我们前面定义的DtpProperties类中,方便后续使用。接下来的比较简单,就是将线程池的BeanDefinition注册到IOC容器中,让spring去帮我们实例化这个bean。
STEP 3. 将已经实例化的线程池注册到核心类 DtpRegistry 中
我们注册了 beanDefinition 后,spring会帮我们实例化出来, 在这之后我们可以根据需要将这个bean进行进一步的处理,spring也提供了很多机制让我们对bean的生命周期管理进行更多的扩展。对应到这里我们就是将实例化出来的线程池注册到核心类 DtpRegistry 中进行管理。
这里我们使用 BeanPostProcessor 进行处理。
@Slf4j
public class DtpBeanPostProcessor implements BeanPostProcessor {
private DefaultListableBeanFactory beanFactory;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DtpExecutor) {
//直接纳入管理
DtpRegistry.registry(beanName, (DtpExecutor) bean);
}
return bean;
}
}
这里的逻辑很简单, 就是判断一下这个bean是不是线程池,是就统一管理起来。
STEP 4. 启用 BeanDefinitionRegistrar 和 BeanPostProcessor
在springboot程序中,只要加一个@MapperScan注解就能启用mybatis的功能,我们可以学习其在spring中的启用方式,自定义一个注解:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DtpImportSelector.class)
public @interface EnableDynamicThreadPool {
}
其中,比较关键的是@Import注解,spring会导入注解中的类DtpImportSelector
而DtpImportSelector这个类实现了:
public class DtpImportSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[]{
DtpImportBeanDefinitionRegistrar.class.getName(),
DtpBeanPostProcessor.class.getName()
};
}
}
这样,只要我们再启动类或者配置类上加上@EnableDynamicThreadPool这个注解,spring就会将DtpImportBeanDefinitionRegistrar和DtpBeanPostProcessor这两个类加入spring容器管理,从而实现我们的线程池的注册。
@SpringBootApplication
@EnableDynamicThreadPool
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
如何实现线程池配置的动态刷新
首先明确一点,对于线程池的实现类,例如:ThreadPoolExecutor等,都有提供核心参数对应的 set 方法,让我们实现参数修改。因此,在核心类DtpRegistry中的refresh方法,我们可以这样写:
public class DtpRegistry {
/**
* 储存线程池
*/
private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>();
/**
* 刷新线程池参数
* @param executorName 线程池名字
* @param properties 线程池参数
*/
public static void refresh(String executorName, ThreadPoolProperties properties) {
ThreadPoolExecutor executor = EXECUTOR_MAP.get(executorName)
//设置参数
executor.setCorePoolSize(...);
executor.setMaximumPoolSize(...);
......
}
}
而这些新参数怎么来呢?我们可以引入Nacos、Apollo等配置中心,实现他们的 * 方法,在 * 方法里调用DtpRegistry的refresh方法刷新即可。
来源:https://juejin.cn/post/7240065163486216253


猜你喜欢
- 什么是tcpTcp通信有两个特点分别是面向连接,具有可靠性.面向连接:指的是客户端与服务端之间的连接,在通信之前会有三次握手的机制来确保连接
- 一、校验分类数据的校验一般分为**前端校验、后端校验**二、前端校验前端校验是最为明显的,先说一下:① HTML非空校验 如 HTML5 新
- 普通校验导入依赖:默认的报错:没有提示具体的属性设置自己的错误信息提示:创建 ValidationMessages.properties内容
- 1、添加spring相关jar包2、配置ehcache jar包。3、添加ehcache mybatis 适配器jar包(在mybatis官
- 一、 简介Opitonal是java8引入的一个新类,目的是为了解决空指针异常问题。本质上,这是一个包含有可选值的包装类,这意味着 Opti
- 最近滑动验证码在很多网站逐步流行起来,一方面对用户体验来说,比较新颖,操作简单,另一方面相对图形验证码来说,安全性并没有很大的降低。当然到目
- Spring核心Spring核心是 IOC 和 AOP 。所谓IoC,对于spring框架来说,就是由spring来负责控制对象的生命周期和
- 前言背景平时开发中遇到根据当前用户的角色,只能查看数据权限范围的数据需求。列表实现方案有两种,一是在开发初期就做好判断赛选,但如果这个需求是
- 阿姆斯特朗数阿姆斯特朗数是一个数字,等于每个数字的幂乘以总位数。 例如,诸如0、1、153、370、371和407、1634、8208、94
- 前言说实话,我在 * 工作的时候,就见过Gradle。但是当时我一直不知道这是什么东西。而且 * 工具组的工程师还将其和Android Stud
- 1. 场景描述本节结合springboot2、springmvc、mybatis、swagger2等,搭建一个完整的增删改查项目,希望通过这
- 一、 搭建struts2环境在myeclipse下,右击项目->MyEclipse->Project Facets->in
- 上一篇实现了移动端微信消息界面功能,以此为基础继续完善服务端功能服务端微信消息页实现微信消息界面的实现,和登录,注册是类似的,无非就是接受客
- 从这章开始,会介绍几个常用的函数式接口工具,首先先来看下这个大家族:首先从Function接口开始介绍一. 概述该接口顾名思义,函数的意思,
- 下载maven 解压路径: 打开环境变量:右键此电脑-属性-高级系统设置-高级-环境变量添加以下系统变量:测试:win+
- spring cloud我想做成一个系列,所以spring cloud+eureka后面会慢慢说到的,有兴趣的小伙伴可以关注后续!这一节就简
- 一、什么是Spring?Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架二、如何在程序中获取Spring配置的be
- 这几年一直在做手机上和电视盒的App,几乎没有考虑过横竖屏切换的问题。电视盒好说,横屏不变,你要是给它设计个竖屏人家也没机会使;而手机上的应
- 本文实例讲述了C#实现字符串与图片的Base64编码转换操作。分享给大家供大家参考,具体如下:using System;using Syst
- 在整合SpringBoot和Mybatis-plus时,想写自定义的sql,所以创建了Mapper.xml文件,但是启动后却老是报错:org