DUCC配置平台实现一个动态化线程池示例代码
作者:京东云开发者 发布时间:2023-11-28 12:07:39
作者:京东零售 张宾
1.背景
在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。
本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。
2.代码实现
当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。
setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:
setMaximumPoolSize方法: 首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。
Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。
基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:
(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;
(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;
(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;
(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;
接下来代码一一实现:
(1)动态线程池类
/**
* 动态线程池
*
*/
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}
(2)动态线程池配置定时刷新类
@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
/**
* Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
*/
private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
/**
* @param threadPoolBeanName
* @param threadPoolTaskExecutor
*/
public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
}
@Override
public void afterPropertiesSet() throws Exception {
this.refresh();
//创建定时任务线程池
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
//延迟1秒执行,每个1分钟check一次
executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
}
private void refresh() {
String dynamicThreadPool = "";
try {
if (DTP_REGISTRY.isEmpty()) {
log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
return;
}
dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
if (StringUtils.isBlank(dynamicThreadPool)) {
log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
return;
}
log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
});
if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
return;
}
for (ThreadPoolProperties properties : threadPoolPropertiesList) {
doRefresh(properties);
}
} catch (Exception e) {
log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
}
}
/**
* @param properties
*/
private void doRefresh(ThreadPoolProperties properties) {
if (StringUtils.isBlank(properties.getThreadPoolBeanName())
|| properties.getCorePoolSize() < 1
|| properties.getMaxPoolSize() < 1
|| properties.getMaxPoolSize() < properties.getCorePoolSize()) {
log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
return;
}
DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
if (Objects.isNull(threadPoolTaskExecutor)) {
log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
return;
}
ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
&& Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
return;
}
if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
}
if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
}
ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
}
private class RefreshThreadPoolConfig extends TimerTask {
private RefreshThreadPoolConfig() {
}
@Override
public void run() {
DynamicThreadPoolRefresh.this.refresh();
}
}
}
线程池配置类
@Data
public class ThreadPoolProperties {
/**
* 线程池名称
*/
private String threadPoolBeanName;
/**
* 线程池核心线程数量
*/
private int corePoolSize;
/**
* 线程池最大线程池数量
*/
private int maxPoolSize;
}
(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key
ducc配置平台使用见:cf.jd.com/pages/viewp…
动态线程池配置key:dynamic.thread.pool
配置value:
[ { "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor", "corePoolSize": 32, "maxPoolSize": 128 }]
(4) 应用启动刷新应用本地动态线程池配置
@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolTaskExecutor) {
DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
}
return bean;
}
}
3.动态线程池应用
动态线程池Bean声明
<!-- 普通线程池 -->
<bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
<!-- 核心线程数,默认为 -->
<property name="corePoolSize" value="128"/>
<!-- 最大线程数,默认为Integer.MAX_VALUE -->
<property name="maxPoolSize" value="512"/>
<!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
<property name="queueCapacity" value="500"/>
<!-- 线程池维护线程所允许的空闲时间,默认为60s -->
<property name="keepAliveSeconds" value="60"/>
<!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
<property name="rejectedExecutionHandler">
<!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
<!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
<!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
<!-- 动态线程池 -->
<bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
<!-- 核心线程数,默认为 -->
<property name="corePoolSize" value="32"/>
<!-- 最大线程数,默认为Integer.MAX_VALUE -->
<property name="maxPoolSize" value="128"/>
<!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
<property name="queueCapacity" value="500"/>
<!-- 线程池维护线程所允许的空闲时间,默认为60s -->
<property name="keepAliveSeconds" value="60"/>
<!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
<property name="rejectedExecutionHandler">
<!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
<!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
<!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
</property>
</bean>
<!-- 动态线程池刷新配置 -->
<bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
<bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>
业务类注入Spring Bean后,直接使用即可
@Resource
private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;
Runnable asyncTask = ()->{...};
CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);
4.小结
本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。
来源:https://juejin.cn/post/7200417924640489532


猜你喜欢
- SpringBoot找不到映射文件org.apache.ibatis.binding.BindingException: Invalid b
- Dataway介绍Dataway 是基于 DataQL 服务聚合能力,为应用提供的一个接口配置工具。使得使用者无需开发任何代码就配置一个满足
- 树的结构说得差不多了,现在我们来说说一种数据结构叫做哈希表(hash table),哈希表有是干什么用的呢?我们知道树的操作的时间复杂度通常
- 最近看到一道有点意思的逻辑算法题,便着手实现一下。题目是要求打印 出N*N顺时针螺旋数组,规律如下:// 1 2 &
- 本文实例为大家分享了Flutter Animation实现缩放和滑动动画的具体代码,供大家参考,具体内容如下Animation对象是Flut
- 问题描述:String preStr = "a.b.c"; // 这里要把该字符串按小圆点进行分割,成"a&q
- Mybatis多层嵌套查询三张表:user article blog表的存储sql文件/*Navicat MySQL Data Transf
- 1.下载AndroidStudioAndroidStudio官网下载地址:http://developer.android.com/intl
- 什么是RecyclerView关于RecyclerView,是一个主要用于展示和回收View的有一个控件,在官用了一句话来概括Recycle
- 本文实例讲述了Android自定义照相机Camera出现黑屏的解决方法。分享给大家供大家参考,具体如下:对于一些手机,像HTC,当自定义Ca
- 约瑟夫环(约瑟夫问题)是一个数学的应用问题:已知 n 个人(以编号1,2,3...n分别表示)围坐在一张圆桌周围。. 从编号为 k 的人开始
- 背景在实际开发过程中,会遇到需要编写各类打印模板模板的需求,当然这些在WPF开发中更为常见,但是使用XAML写编辑的打印模板又不能直接发送给
- 你平时是怎么读取文件的?使用流读取。是的没错,C#给我们提供了非常强大的类库(又一次吹捧了.NET一番),里面封装了几乎所有我们可以想到的和
- 为什么需要多线程?模型的简化,如某些程序是由多个相对独立任务的运行:图形界面的出现,输入、输出的阻塞多核CPU的更好利用异步行为的需要Jav
- 修改\packages\apps\Camera\res\values\arrays.xml中的以下代码: <string-array
- C# DateTime与时间戳的相互转换,包括JavaScript时间戳和Unix的时间戳。1. 什么是时间戳首先要清楚JavaScript
- Android中有两种主要方式使用Service,通过调用Context的startService方法或调用Context的bindServ
- 前言最近遇到了这样一个工作场景,需要写一批dubbo接口,再将dubbo接口注册到网关中,但是当dubbo接口异常的时候会给前端返回非常不友
- MybatisMyBatis ,是国内最火的持久层框架采用了ORM思想解决了实体类和数据库表映射的问题。对JDBC进行了封装,屏蔽了JDBC
- 本文实例讲述了Android编程实现的手写板和涂鸦功能。分享给大家供大家参考,具体如下:下面仿一个Android手写板和涂鸦的功能,直接上代