ZooKeeper 实现分布式锁的方法示例
作者:Beck's Blog 发布时间:2023-03-20 07:26:43
ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。
节点
在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。
Znode 又分为以下四种类型:
类型 | 描述 |
---|---|
持久节点 | 节点创建后,会一直存在,不会因客户端会话失效而删除 |
持久顺序节点 | 基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
临时节点 | 客户端会话失效或连接关闭后,该节点会被自动删除 |
临时顺序节点 | 基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名 |
锁原理
ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
而且用临时顺序节点的另外一个用意是如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。
如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创建了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。
代码测试
请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面描述是一致的,有兴趣可以看看源码,应该也不难理解。
创建 .NET Core 控制台程序 Nuget
安装 ZooKeeperNetEx.Recipes
创建 ZooKeeper Client
private const int CONNECTION_TIMEOUT = 50000;
private const string CONNECTION_STRING = "127.0.0.1:2181";
private ZooKeeper CreateClient()
{
var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance);
Stopwatch sw = new Stopwatch();
sw.Start();
while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT)
{
var state = zooKeeper.getState();
if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING)
{
break;
}
}
sw.Stop();
return zooKeeper;
}
class NullWatcher : Watcher
{
public static readonly NullWatcher Instance = new NullWatcher();
private NullWatcher() { }
public override Task process(WatchedEvent @event)
{
return Task.CompletedTask;
}
}
添加 Lock 方法
/// <summary>
/// 加锁
/// </summary>
/// <param name="key">加锁的节点名</param>
/// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param>
/// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param>
/// <returns></returns>
public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null)
{
// 获取 ZooKeeper Client
ZooKeeper keeper = CreateClient();
// 指定锁节点
WriteLock writeLock = new WriteLock(keeper, $"/{key}", null);
var lockCallback = new LockCallback(() =>
{
lockAcquiredAction.Invoke();
writeLock.unlock();
}, lockReleasedAction);
// 绑定锁获取和释放的监听对象
writeLock.setLockListener(lockCallback);
// 获取锁(获取失败时会监听上一个临时节点)
await writeLock.Lock();
}
class LockCallback : LockListener
{
private readonly Action _lockAcquiredAction;
private readonly Action _lockReleasedAction;
public LockCallback(Action lockAcquiredAction, Action lockReleasedAction)
{
_lockAcquiredAction = lockAcquiredAction;
_lockReleasedAction = lockReleasedAction;
}
/// <summary>
/// 获取锁成功回调
/// </summary>
/// <returns></returns>
public Task lockAcquired()
{
_lockAcquiredAction?.Invoke();
return Task.FromResult(0);
}
/// <summary>
/// 释放锁成功回调
/// </summary>
/// <returns></returns>
public Task lockReleased()
{
_lockReleasedAction?.Invoke();
return Task.FromResult(0);
}
}
多线程模拟测试
static async Task RunAsync()
{
Parallel.For(1, 10, async (i) =>
{
await new ZooKeeprDistributedLock().Lock("locks", () =>
{
Console.WriteLine($"第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000); // 业务逻辑...
}, () =>
{
Console.WriteLine($"第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("-------------------------------");
});
});
await Task.CompletedTask;
}
虽然模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。
来源:http://beckjin.com/2019/06/16/zookeeper-lock/


猜你喜欢
- 本文实例讲述了C#实现读写ini文件类。分享给大家供大家参考。具体如下:这个C#类封装了对INI配置文件进行操作所需的各种函数,包括读取键值
- 互斥锁(Mutex)互斥锁是一个互斥的同步对象,意味着同一时间有且仅有一个线程可以获取它。互斥锁可适用于一个共享资源每次只能被一个线程访问的
- Spring Boot如何实现分布式系统中的服务发现和注册?随着互联网的快速发展,越来越多的企业开始将自己的业务迁移到分布式系统中。在这种情
- 1. InputStream -> byte[]引入 apache.commons.is 包import org.apache.com
- 本文以一个简单实例讲述了C#装箱和拆箱操作的实现方法,简单来说装箱是将值类型转换为引用类型;拆箱是将引用类型转换为值类型,是涉及栈和堆的使用
- android在网络上下载文件,供大家参考,具体内容如下步骤: 1.使用HTTP协议下载文件- 创建一个HttpURLConnection对
- 本文实例为大家分享了Winform实现导入导出Excel文件的具体代码,供大家参考,具体内容如下/// <summary> &n
- package com.tiantian.algorithms;/** * _|_1 
- 一、代理的概念 * 技术是整个java技术中最重要的一个技术,它是学习java框架的基础,不会 * 技术,那么在学习Spring这些框架
- 目录前言:一、餐馆合并菜单二、改进菜单实现三、迭代器模式总结前言:迭代器模式平时用的不多,因为不管C#还是Java都已经帮我封装了,但是你是
- 什么是TaskScheduler?SynchronizationContext是对“调度程序(scheduler)&am
- C#自己没有Inputbox这个类,但是Inputbox也蛮好用的,所以有两种方法可以使用一:间接调用vb中的Inputbox功能 
- 上移动端的测试课,老师和同学们用的都是eclipse, 只有我一个人用的是idea(用了两款软件之后觉得IDEA更好),真的太难了,配置
- 一、场景笔者就Zuul网关下实现其负载均衡与熔断机制(雪崩)进行实践,前提是已经导入zuul相关依赖springboot版本:1.5.9.R
- 一、 序列化和反序列化概念Serialization(序列化)是一种将对象以一连串的字节描述的过程;反序列化deserialization是
- 本文实例讲述了c#用for语句输出一个三角形的方法。分享给大家供大家参考。具体分析如下:这是一道面试题,要求是这样的:只使用一个for循环输
- 坑出现的环境一般情况下切割字符串会使用split或者StringTokenizer,如下代码String s = ",,o,,&q
- C# multipart/form-data提交文件和参数public static string PostJsonData(string
- 本文主要介绍了Android studio利用gradle打jar包并混淆的方法,下面话不多说,来看看详细的介绍吧。首先打jar包的配置很简
- 本文实例讲述了Android游戏开发学习②焰火绽放效果实现方法。分享给大家供大家参考。具体如下:本节介绍在游戏开发中常用到的数学物理应用——