Java Zookeeper分布式分片算法超详细讲解流程
作者:Redick01 发布时间:2023-07-08 06:56:36
背景
公司的一个服务需要做类似于分片的逻辑,一开始服务基于传统部署方式通过本地配置文件配置的方式就可以指定该机器服务的分片内容如:0,1,2,3,随着系统的升级迭代,该服务进行了容器化部署,所以原来基于本地配置文件各自配置分片数据的方式就不适用了,原来的部署方式使得服务是有状态,是一种非云原生的方式,所以该服务要重新设计实现一套分布式服务分片
逻辑。
技术方案
分布式协调中间件
要实现分布式服务分片的能力,需要有一个分布式中间件,如:Redis
,Mysql
,Zookeeper
等等都可以,我们选用Zookeeper
。
基于Zookeeper的技术方案
使用Zookeeper
主要是基于Zookeeper
的临时节点和节点变化监听机制,具体的技术设计如下:
服务注册目录设计
Zookeeper
的数据存储结构类似于目录,服务注册后的目录类似如下结构:
解释下该目录结构,首先/xxxx/xxxx/sharding
是区别于其他业务的的目录,该目录节点是持久的,service
是服务目录,标识一个服务,该节点也是持久的,ip1
,ip2
是该服务注册到Zookeeper
的机器列表节点,该节点是临时节点。
/xxxx/xxxx/sharding/service/ip1
-----|----|--------|-------/ip2
服务分片处理流程
服务启动,创建
CuratorFramework
客户端,设置客户端连接状态监听;向
Zookeeper
注册该机器的信息,这里设计简单,机器信息就是ip
地址;注册机器信息后,从
Zookeeper
获取所有注册信息;根据
Zookeeper
获取的所有注册机器信息根据分片算法进行分片计算。
编码实现
ZookeeperConfig
Zookeeper
的配置信息
@Data
public class ZookeeperConfig {
/**
* zk集群地址
*/
private String zkAddress;
/**
* 注册服务目录
*/
private String nodePath;
/**
* 分片的服务名
*/
private String serviceName;
/**
* 分片总数
*/
private Integer shardingCount;
public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
this.zkAddress = zkAddress;
this.nodePath = nodePath;
this.serviceName = "/" + serviceName;
this.shardingCount = shardingCount;
}
/**
* 等待重试的间隔时间的初始值.
* 单位毫秒.
*/
private int baseSleepTimeMilliseconds = 1000;
/**
* 等待重试的间隔时间的最大值.
* 单位毫秒.
*/
private int maxSleepTimeMilliseconds = 3000;
/**
* 最大重试次数.
*/
private int maxRetries = 3;
/**
* 会话超时时间.
* 单位毫秒.
*/
private int sessionTimeoutMilliseconds;
/**
* 连接超时时间.
* 单位毫秒.
*/
private int connectionTimeoutMilliseconds;
}
InstanceInfo注册机器
@AllArgsConstructor
@EqualsAndHashCode()
public class InstanceInfo {
private String ip;
public String getInstance() {
return ip;
}
}
ZookeeperShardingService分片服务
@Slf4j
public class ZookeeperShardingService {
public final Map<String, List<Integer>> caches = new HashMap<>(16);
private final CuratorFramework client;
private final ZookeeperConfig zkConfig;
private final ShardingStrategy shardingStrategy;
private final InstanceInfo instanceInfo;
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
this.zkConfig = zkConfig;
log.info("开始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getZkAddress())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
this.shardingStrategy = shardingStrategy;
HostInfo host = new HostInfo();
this.instanceInfo = new InstanceInfo(host.getAddress());
client = builder.build();
client.getConnectionStateListenable().addListener(new ConnectionListener());
client.start();
try {
COUNT_DOWN_LATCH.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 注册服务节点监听
registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
} catch (final Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
/**
* 子节点 *
* @param nodePath 主节点
* @param listener *
*/
private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
try {
// 1. 创建一个PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
// 2. 添加目录 *
pathChildrenCache.getListenable().addListener(listener);
// 3. 启动 *
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
log.error("注册子目录 * 出现异常,nodePath:{}",nodePath,e);
throw new RuntimeException(e);
}
}
/**
* 服务启动,注册zk节点
* @throws Exception 异常
*/
private void zkOp() throws Exception {
// 是否存在ruubypay-sharding主节点
if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
}
// 是否存服务主节点
if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
// 创建服务主节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
}
// 检查是否存在临时节点
if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance());
// 创建临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
"/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
}
shardingFromZk();
}
/**
* 从zk获取机器列表并进行分片
* @throws Exception 异常
*/
private void shardingFromZk() throws Exception {
// 从 serviceName 节点下获取所有Ip列表
final GetChildrenBuilder childrenBuilder = client.getChildren();
final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
List<InstanceInfo> res = new ArrayList<>();
instanceList.forEach(s -> {
res.add(new InstanceInfo(s));
});
Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
// 先清一遍缓存
caches.clear();
shardingResult.forEach((k, v) -> {
caches.put(k.getInstance().split("-")[0], v);
});
}
/**
* zk连接监听
*/
private class ConnectionListener implements ConnectionStateListener {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
try {
zkOp();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
COUNT_DOWN_LATCH.countDown();
}
}
}
}
/**
* 子节点监听
*/
private class ChildrenPathListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
PathChildrenCacheEvent.Type type = event.getType();
if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
try {
shardingFromZk();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
}
分片算法
采用平均分配的算法
public interface ShardingStrategy {
Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
}
public class AverageAllocationShardingStrategy implements ShardingStrategy {
@Override
public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
if (list.isEmpty()) {
return null;
}
Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
addAliquant(list, shardingCount, result);
return result;
}
private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
int count = 0;
for (InstanceInfo each : instanceInfos) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % instanceInfos.size();
int count = 0;
for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
}
count++;
}
}
}
来源:https://blog.csdn.net/qq_31279701/article/details/128565133


猜你喜欢
- 效果图示例结构图代码解析导入dataBinding dataBinding{ &nb
- 本文实例为大家分享了Android实现3D层叠式卡片图片展示的具体代码,供大家参考,具体内容如下先看效果好了效果看了,感兴趣的往下看哦!整体
- 使用正则表达式进行替换:代码片段:String documentTxt = EntityUtils.toString(entity,&quo
- 本文实例讲述了Android编程之菜单实现方法。分享给大家供大家参考,具体如下:菜单是许多应用程序不可或缺的一部分,Android中更是如此
- 说明使用工具:brew caskbrew cask是一个用命令行管理Mac下应用的工具,提供了自动安装和卸载功能,能够自动从官网上下载并安装
- Kotlin 基础教程之类、对象、接口Kotlin中类、接口相关概念与Java一样,包括类名、属性、方法、继承等,如下示例:interfac
- 一、什么是内存泄露?Java使用有向图机制,通过GC自动检查内存中的对象(什么时候检查由虚拟机决定),如果GC发现一个或一组对象为不可到达状
- 简介工厂方法模式是什么?为什么要有工厂方法模式,不是有了简单工厂模式了吗?两个模式都有工厂,那有什么不同呢?功工厂方式模式是怎样实现的?OK
- 注意,本文不是字符串排序,是字符串数组的排序。方法分别是:1、低位优先键索引排序2、高位优先建索引排序3、Java自带排序(经过调优的归并排
- 前言解释:之前用的ScreenToGif录屏,因为上传的.gif最大不超过5MB,所以做了不少删帧和色彩弱化等处理,这才导致色彩看上去不是很
- 项目介绍springboot搭建的访客管理系统,针对高端基地做严格把控来访人员信息管理,用户后端可以设置多个管理员帐号,给予不同部门的管理层
- 前言在Web应用开发过程中,一般都涵盖一些常用功能的实现,如数据库访问、异常处理、消息队列、缓存服务、OSS服务,以及接口日志配置,接口文档
- 背景在Java中一个回调的操作是一个在一些操作完成之后被传递到另一个函数中并且被执行的函数。一个回调函数既可以被同步或者异步执行。在一个同步
- 任何一个类都是Class类的实例对象,这个实例对象有三种表示方式第一种表示方式(任何一个类都有一个隐含的静态成员变量class):Class
- 目录1.项目需求描述2.整体思路3.功能实现1.项目需求描述通过订单号获取某系统内订单的详细数据,不需要账号密码的登录验证,但有图片验证码的
- 前言Spring Seuciry相关的内容看了实在是太多了,但总觉得还是理解地不够巩固,还是需要靠知识输出做巩固。相关版本:java: jd
- 将Android项目导出为Library1.修改build.gradle中的Module:app文件最终如下:2. 进入到项目文件夹目录,保
- 本文实例讲述了Spring与Struts整合之让Spring管理控制器操作。分享给大家供大家参考,具体如下:一 Web配置<?xml
- 为了是java中的对象便于理解,我们可以使用一款比较好用的数据格式,在数据解析的时候也会经常用到,它就是JSON。在这里我们转换对象和字符串
- 前言前段时间碰到了中转文件的需求,需要使用HttpClient中转一下文件,过程为:在实现这个需求的过程中就用得到了MultipartFil