Java动态线程池插件dynamic-tp集成zookeeper
作者:Redick01 发布时间:2023-11-25 03:41:38
前言
dynamic-tp
是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,在配置中心的支持上最开始的时候支持Nacos
和Apollo
,由于笔者公司用的配置中心是Zookeeper
,所以就想着扩展支持Zookeeper
,在了解源码支持发现dynamic-tp
的扩展能力做的很好,提供了扩展接口,只要我开发对应的配置中心模块即可,最终笔者实现了Zookeeper
的支持并贡献到社区。接下来我通过源码解析方式介绍下Zookeeper
配置中心的接入。
配置刷新
dynamic-tp
提供了一个刷新配置的接口Refresher
,抽象类AbstractRefresher
实现刷新配置接口的刷新配置方法refresh
,该方法能根据配置类型内容和配置解析配置并刷新动态线程池的相关配置,由DtpRegistry
负责刷新线程池配置,事件发布订阅模式操作Web容器参数,代码如下:
public interface Refresher {
/**
* Refresh with specify content.
* @param content content
* @param fileType file type
*/
void refresh(String content, ConfigFileTypeEnum fileType);
}
@Slf4j
public abstract class AbstractRefresher implements Refresher {
@Resource
private DtpProperties dtpProperties;
@Resource
private ApplicationEventMulticaster applicationEventMulticaster;
@Override
public void refresh(String content, ConfigFileTypeEnum fileTypeEnum) {
if (StringUtils.isBlank(content) || Objects.isNull(fileTypeEnum)) {
return;
}
try {
// 根据配置内容和配置类型将配置内容转成Map
val prop = ConfigHandler.getInstance().parseConfig(content, fileTypeEnum);
doRefresh(prop);
} catch (IOException e) {
log.error("DynamicTp refresh error, content: {}, fileType: {}",
content, fileTypeEnum, e);
}
}
private void doRefresh(Map<Object, Object> properties) {
// 将Map中的配置转换成DtpProperties
ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);
Binder binder = new Binder(sources);
ResolvableType type = ResolvableType.forClass(DtpProperties.class);
Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
binder.bind(MAIN_PROPERTIES_PREFIX, target);
// 刷新动态线程池配置
DtpRegistry.refresh(dtpProperties);
// 发布刷新实现,该事件用于控制Web容器线程池参数控制
publishEvent();
}
private void publishEvent() {
RefreshEvent event = new RefreshEvent(this, dtpProperties);
applicationEventMulticaster.multicastEvent(event);
}
}
Zookeeper配置中心接入扩展实现
基于AbstractRefresher
就可以实现Zookeeper
配置中心的扩展了,Zookeeper
的扩展实现继承AbstractRefresher
,Zookeeper
的扩展实现只需要监听配置中心的配置变更即可拿到配置内容,然后通过refresh
刷新配置即可。代码如下:
ZookeeperRefresher
继承AbstractRefresher
,实现InitializingBean
,afterPropertiesSet
方法逻辑从配置DtpProperties
获取Zookeeper
的配置信息,CuratorFrameworkFactory
创建客户端,设置 * ,这里有两种 * ,一个是连接监听ConnectionStateListener
,一个是节点变动监听CuratorListener
,出发监听后loadNode
负责从Zookeeper
获取配置文件配置并组装配置内容,然后通过refresh
刷新配置,注意,Zookeeper
配置目前配置类型仅支持properties
。
@Slf4j
public class ZookeeperRefresher extends AbstractRefresher implements InitializingBean {
@Resource
private DtpProperties dtpProperties;
private CuratorFramework curatorFramework;
@Override
public void afterPropertiesSet() throws Exception {
DtpProperties.Zookeeper zookeeper = dtpProperties.getZookeeper();
curatorFramework = CuratorFrameworkFactory.newClient(zookeeper.getZkConnectStr(),
new ExponentialBackoffRetry(1000, 3));
String nodePath = ZKPaths.makePath(ZKPaths.makePath(zookeeper.getRootNode(),
zookeeper.getConfigVersion()), zookeeper.getNode());
final ConnectionStateListener connectionStateListener = (client, newState) -> {
if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
loadNode(nodePath);
}};
final CuratorListener curatorListener = (client, curatorEvent) -> {
final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
if (null != watchedEvent) {
switch (watchedEvent.getType()) {
case NodeChildrenChanged:
case NodeDataChanged:
loadNode(nodePath);
break;
default:
break;
}
}};
curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
curatorFramework.getCuratorListenable().addListener(curatorListener);
curatorFramework.start();
log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath);
}
/**
* load config and refresh
* @param nodePath config path
*/
public void loadNode(String nodePath) {
try {
final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
final List<String> children = childrenBuilder.watched().forPath(nodePath);
StringBuilder content = new StringBuilder();
children.forEach(c -> {
String n = ZKPaths.makePath(nodePath, c);
final String nodeName = ZKPaths.getNodeFromPath(n);
final GetDataBuilder data = curatorFramework.getData();
String value = "";
try {
value = new String(data.watched().forPath(n), StandardCharsets.UTF_8);
} catch (Exception e) {
log.error("zk config value watched exception.", e);
}
content.append(nodeName).append("=").append(value).append("\n");
});
refresh(content.toString(), ConfigFileTypeEnum.PROPERTIES);
} catch (Exception e) {
log.error("load zk node error, nodePath is {}", nodePath, e);
}
}
}
来源:https://blog.csdn.net/qq_31279701/article/details/123551397


猜你喜欢
- Activiti 介绍Activiti是一个开源的工作流引擎,它实现了BPMN 2.0规范,可以发布设计好的流程定义,并通过api进行流程调
- Web Fragment 是什么 - 它是在 servlet 3.0开始支持的,可以把一个dy web项目拆分为多个项目,解耦合,使其在项目
- 笔者近2天在 Android Studio上玩了一下百度地图,碰到了常见的"230错误 APP Scode校验失败",下
- 刚学完JDBC不久,做了一个简单的学生管理系统,可能还有不完善的地方,望各路大神见谅。废话不多说,我先贴个图让大家让大家瞅瞅,觉得是你想要的
- 1 MyBatisPlusConfigMyBatisPlus配置类。package com.config;import
- SpringMVC异常处理机制(一)项目前准备首先参照文章Spring课程工程构建+SpringMVC简介及其快速入门搭建项目搭建好一个项目
- 本文实例讲述了C#使用iTextSharp设置PDF所有页面背景图功能的方法。分享给大家供大家参考。具体如下:在生成PDF 的时候,虽然可以
- 使用Post添加数据到数据库出现方块乱码解决方法,在web.xml里最前面添加过滤器,代码如下,放在最前面,因为有优先级,要首先拦截<
- Elasticsearch 通常如何工作?我们将文档索引到 Elasticsearch 中并对其运行查询以获得满足提供的搜索条件的文档。 我
- 本文实例为大家分享了Java使用单链表实现约瑟夫环的具体代码,供大家参考,具体内容如下构建一个单向的环形链表思路1.先创建第一个节点, 让f
- Jetty和tomcat的比较Tomcat和Jetty都是一种Servlet引擎,他们都支持标准的servlet规范和JavaEE的规范。架
- 一、前言跟很多小伙伴聊天,发现一个严重的问题,很多小伙伴横向发展的貌似很不错,很多技术都能说出一二,但是如果在某个技术上深挖一下就不行了,问
- Lambda 表达式Lambda 表达式是现代 C++ 中最重要的特性之一,而 Lambda 表达式,实际上就是提供了一个类似匿名函数的特性
- 一、简介Join方法主要是用来阻塞调用线程,直到某个线程终止或经过了指定时间为止。官方的解释比较乏味,通俗的说就是创建一个子线程,给它加了这
- 之前写过3篇手势密码的demo,不过没有集成到真实的企业项目中,这几天正好领到一个手势密码项目,昨天刚好弄完,今天抽空整理下,目前还没有完善
- 一、引言90坦克大战,很经典的一款游戏,当年与小伙伴一人一个手柄,搬上小板凳坐在电视机前,身体时不时跟随手柄摇晃着,时而表情严肃、眉头紧锁,
- 不适用click而用touch自定义监听:class myOnGestureListener extends GestureDetector
- 数学原理:  
- 本文实例讲述了C#判断字符串是否存在字母及字符串中字符的替换的方法。分享给大家供大家参考。具体实现方法如下:首先要添加对命名空间“using
- 方法一:通过Theme.Translucent@android:style/Theme.Translucent@android:style/