InterProcessMutex实现zookeeper分布式锁原理
作者:冬雪是你 发布时间:2023-08-11 05:46:43
标签:InterProcessMutex,zookeeper,分布式锁
原理简介:
zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且其他节点就会监听序号比自己小的节点,一旦序号比自己小的节点被删除了,其他节点就会得到相应的事件,然后查看自己是否为序号最小的节点,如果是,则获取锁。
zookeeper节点图分析
InterProcessMutex实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的。
InterProcessMutex实现的InterProcessLock接口,InterProcessLock主要规范了如下几个方法:
// 获取互斥锁
public void acquire() throws Exception;
// 在给定的时间内获取互斥锁
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 释放锁处理
public void release() throws Exception;
// 如果此JVM中的线程获取了互斥锁,则返回true
boolean isAcquiredInThisProcess();
接下来我们看看InterProcessMutex中的实现,它究竟有哪些属性,以及实现细节
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
// LockInternals是真正实现操作zookeeper的类,它内部包含连接zookeeper客户端的CuratorFramework
// LockInternals的具体实现后面我会讲到
private final LockInternals internals;
// basePath是锁的根结点,所有的临时有序的节点都是basePath的子节点,
private final String basePath;
//
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
// LockData封装了请求对应的线程(owningThread)、锁的重入的次数(lockCount)、线程对应的临时节点(lockPath)
private static class LockData {
final Thread owningThread;
final String lockPath;
// 原子性的
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
private static final String LOCK_NAME = "lock-";
// 获取互斥锁,阻塞【InterProcessLock的实现】
@Override
public void acquire() throws Exception {
// 获取锁,一直等待
if ( !internalLock(-1, null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
// 获取互斥锁,指定时间time【InterProcessLock的实现】
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
// 当前线程是否占用锁中【InterProcessLock的实现】
@Override
public boolean isAcquiredInThisProcess() {
return (threadData.size() > 0);
}
//如果调用线程与获取互斥锁的线程相同,则执行一次互斥锁释放。如果线程已多次调用acquire,当此方法返回时,互斥锁仍将保留 【InterProcessLock的实现】
@Override
public void release() throws Exception {
Thread currentThread = Thread.currentThread(); //当前线程
LockData lockData = threadData.get(currentThread); //线程对应的锁信息
if ( lockData == null ) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// 因为获取到的锁是可重入的,对lockCount进行减1,lockCount=0时才是真正释放锁
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ) {
return;
}
if ( newLockCount < 0 ) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 到这里时lockCount=0,具体释放锁的操作交给LockInternals中的releaseLock方法实现
internals.releaseLock(lockData.lockPath);
}
finally {
threadData.remove(currentThread);
}
}
// 获取basePath根结点下的所有临时节点的有序集合
public Collection<String> getParticipantNodes() throws Exception {
return LockInternals.getParticipantNodes(internals.getClient(), basePath, internals.getLockName(), internals.getDriver());
}
boolean isOwnedByCurrentThread() {
LockData lockData = threadData.get(Thread.currentThread());
return (lockData != null) && (lockData.lockCount.get() > 0);
}
protected String getLockPath() {
LockData lockData = threadData.get(Thread.currentThread());
return lockData != null ? lockData.lockPath : null;
}
// acquire()中调用的internalLock()方法
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null ) {
// 如果当前线程已经获取到了锁,那么将重入次数lockCount+1,返回true
lockData.lockCount.incrementAndGet();
return true;
}
// attemptLock方法是获取锁的真正实现,lockPath是当前线程成功在basePath下创建的节点,若lockPath不为空代表成功获取到锁
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null ) {
// lockPath封装到当前线程对应的锁信息中
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
}
接下来我们看看InterProcessMutex中使用的LockInternals类的实现细节
public class LockInternals {
private final CuratorFramework client; // 连接zookeeper的客户端
private final String path;// 等于basePath,InterProcessMutex中传进来的
private final String basePath; // 根结点
private final LockInternalsDriver driver; // 操作zookeeper节点的driver
private final String lockName; // "lock-"
private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null);
private final CuratorWatcher revocableWatcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) {
checkRevocableWatcher(event.getPath());
}
}
};
// 监听节点的 * ,若被监听的节点有动静,则唤醒 notifyFromWatcher()=>notifyAll();
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
private volatile int maxLeases;
// 获取basePath的子节点,排序后的
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
{
List<String> children = client.getChildren().forPath(basePath);
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
// 尝试获取锁【internalLock=>attemptLock】
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{// 开始时间
final long startMillis = System.currentTimeMillis();
// 记录等待时间
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
// 重试次数
int retryCount = 0;
// 当前节点
String ourPath = null;
// 是否获取到锁的标志
boolean hasTheLock = false;
// 是否放弃获取到标志
boolean isDone = false;
// 不停尝试获取
while ( !isDone )
{
isDone = true;
try
{// 创建当前线程对应的节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// internalLockLoop中获取
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{// 是否可再次尝试
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
// 获取到锁后,返回当前线程对应创建的节点路径
if ( hasTheLock )
{
return ourPath;
}
return null;
}
// 循环获取【attemptLock=>internalLockLoop】
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false; // 是否拥有分布式锁
boolean doDelete = false;// 是否需要删除当前节点
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 循环尝试获取锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{// 得到basePath下排序后的临时子节点
List<String> children = getSortedChildren();
// 获取之前创建的当前线程对应的子节点
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断是否获取到锁,没有就返回监听路径
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
// 成功获取到
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{// 没有获取到锁,监听前一个临时顺序节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 上一个临时顺序节点如果被删除,会唤醒当前线程继续竞争锁
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 获取锁超时
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
// 因为获取锁超时,所以删除之前创建的临时子节点
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private void deleteOurPath(String ourPath) throws Exception {
try
{
// 删除
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
}
StandardLockInternalsDriver implements LockInternalsDriver
// 前面internalLockLoop方法中driver.getsTheLock执行的方法
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
// 获取子节点在临时顺序节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
// 检验子节点在临时顺序节点列表中是否有效
validateOurIndex(sequenceNodeName, ourIndex);
// 若当前子节点的位置<maxLeases,代表可获取锁【maxLeases默认=1,若ourIndex=0,代笔自己位置最小】
boolean getsTheLock = ourIndex < maxLeases;
// getsTheLock=true,则不需要监听前maxLeases的节点【maxLeases默认=1,代表监听前面最靠近自己的节点】
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
用InterProcessMutex在自己业务实现分布式锁,请点击此链接阅读点我
来源:https://blog.csdn.net/m0_45097637/article/details/123591672


猜你喜欢
- 对于服务器端开发人员而言,调用第三方接口获取数据,将其“代理”转化并返给客户端几乎是家常便
- ADB全称Android Debug Bridge, 是android sdk里的一个工具, 用这个工具可以直接操作管理android模拟器
- 本文实例讲述了C#实现winform中RichTextBox在指定光标位置插入图片的方法。分享给大家供大家参考,具体如下://获取RichT
- 本文实例讲述了winform中的ListBox和ComboBox绑定数据用法。分享给大家供大家参考。具体实现方法如下:本例实现将集合数据绑定
- 首先来一道思考题:String str1 = "111111";String str2 = "222222&q
- 我们在使用SpringBoot进行测试的时候一般是需要加两个注解:@SpringBootTest目的是加载ApplicationContex
- 1 本地调试介绍本地调试: 这里是指在开发环境中,部署了一整套的某个项目或者产品的服务,开发人员开发时,本地会起一个或多个服务,这些服务和开
- 详解android 通过uri获取bitmap图片并压缩很多人在调用图库选择图片时会在onActivityResult中用Media.get
- 本文实例为大家分享了Java基于Socket实现简易版多人聊天室的具体代码,供大家参考,具体内容如下一、 聊天室需求1、一个服务端,多个客户
- 每天上下楼都是乘坐电梯的,就想电梯的工作原理是什么呢?于是自己写了个控制台程序来模拟一下电梯的工作原理!采用面向对象的编程思想!将电梯拆解为
- 调取钉钉考勤接口的功能公司需要做一个钉钉考勤的页面,让我去写这个功能。结果却比我想象的要麻烦一些!具体是怎么个麻烦呢下面直入正题首先我们找到
- 一,简介Feign使得 Java HTTP 客户端编写更方便。Feign 灵感来源于Retrofit、JAXRS-2.0和WebSocket
- 本文实例讲述了Android开发之imageView图片按比例缩放的实现方法。分享给大家供大家参考,具体如下:android:scaleTy
- 前言:发现用Winform做一个圆角按钮遇到麻烦,主要是锯齿问题,后面想了想办法解决问题了。主要方法是按钮的区域通过Region指定,但按钮
- 不久前项目开始了一段时间了,刚开始怀疑是Android Studio中新加入的Instant Run功能引起的,于是重新打release包后
- 主线程和子线程的区别每个线程都有一个唯一标示符,来区分线程中的主次关系的说法。 线程唯一标示符:Thread.CurrentThread.M
- 目录直播界面滑动隐藏效果用户交互页实现礼物进入时动画礼物移出动画开启定时清理礼物列表直播界面实现的是播放本地的视频文件:/** * 直播界面
- 带你手把手,用 java swing实现抖音上的表白程序1.准备工作a.需要下载一个带着swing插件的eclipseb.需要配置好JDKc
- 公司app要求做一个扭蛋功能,其实就是一个可拖动层叠卡片列表,原理还是由一个自定义Recyclerview和LayoutManager来实现
- 目录二维码生成原理(即工作原理)效果图如下:前提:源码如下:总结二维码生成原理(即工作原理)二维码官方叫版本Version。Version