Redisson公平锁的源码解读分享
作者:王谷雨 发布时间:2022-12-13 01:20:40
前言
我在上一篇文章聊了Redisson的分布式锁,这次继续来聊聊Redisson的公平锁。下面是官方原话:
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
源码版本:3.17.7
这是我 fork 的分支,添加了自己理解的中文注释:https://github.com/xiaoguyu/redisson
公平锁
先上官方例子:
RLock fairLock = redisson.getFairLock("anyLock");
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
因为在Redisson中,公平锁和普通可重入锁的逻辑大体上一样,我在上一篇文章都介绍了,这里就不再赘述。下面开始介绍合理逻辑。
加锁
加锁的 lua 脚本在 RedissonFairLock#tryLockInnerAsync方法中
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
long wait = threadWaitTime;
if (waitTime > 0) {
wait = unit.toMillis(waitTime);
}
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
......
}
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// remove stale threads
"while true do " + // list为空,证明没有人排队,退出循环
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set
// NOTE we do not alter any other timeout
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
// check if the lock can be acquired now
// 检查是否可以获取锁。如果hash和list都不存在,或者线程队列的第一个是当前线程,则可以获取锁
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
// 都获取锁了,当然要从线程队列和时间队列中移除
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
// 刷新时间集合中的时间
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
// 和公平锁的设置一样,值加1并且设置过期时间
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// check if the lock is already held, and this is a re-entry
// 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
// the lock cannot be acquired
// check if the thread is already in the queue
// 时间集合中有值,证明线程已经在队列中,不需要往后执行逻辑了
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
// 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
// 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
// the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the
// threadWaitTime
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
// 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
// 否则直接获取锁的存活时间
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
// 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
// 如果添加到时间集合成功,则同时添加线程集合
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime);
}
throw new IllegalArgumentException();
}
公平锁总共用了Redis的三种数据类型,对应着 lua 脚本里面的keys1、2、3的参数:
KEYS[1]
锁的名字,使用 Hash 数据类型,是可重入锁的基础,结构为 {”threadId1”: 1, “thread2”: 1},key为线程id,value是锁的次数
KEYS[2]
线程队列的名字,使用 List 数据类型,结构为 [ “threadId1”, “threadId2” ],按顺序存放需要获取锁的线程的id
KEYS[3]
时间队列的名字,使用 sorted set 数据类型,结构为 {”threadId2”:123, “threadId1”:190},key为线程id,value为获取锁的超时时间戳
我下面会用 锁、线程队列、时间队列 来表示这3个数据结构,需要注意下我的表述。
同样的,介绍下参数:
ARGV[1]:leaseTime 锁的持有时间
ARGV[2]:线程id(描述不太准确,暂时按这样理解)
ARGV[3]:waitTime 尝试获取锁的最大等待时间
ARGV[4]:currentTime 当前时间戳
接下来,我们一段一段分析 lua 脚本,首先看最开始的 while 循环
"while true do " + // list为空,证明没有人排队,退出循环
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// 能到这里,证明有人排队,拿出在排队的第一个人的超时时间,如果超时了,则移除相应数据
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
"if timeout <= tonumber(ARGV[4]) then " +
// 从时间队列和线程队列中移除
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
具体的逻辑我在注释中写的很清楚了,看的时候记住 KEYS[2]、KEYS[3] 对应着线程队列和时间队列接口。主要注意的是,线程队列只有当一个线程持有锁,另一个线程获取不到锁时,才会有值(前面有人才排队,没人排什么队)。接着看第二段
// 检查是否可以获取锁。当锁不存在,并且线程队列不存在或者线程队列第一位是当前线程,则可以获取锁
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
// 都获取锁了,当然要从线程队列和时间队列中移除
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
// 刷新时间队列中的时间
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
// 和公平锁的设置一样,值加1并且设置过期时间
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
翻译翻译就是,锁不存在(别人没有持有锁)并且线程队列不存在或者线程队列第一位是当前线程(不用排队或者自己排第一)才能获得锁。因为时间队列中存放的是各个线程等待锁的超时时间戳,所以每次都需要刷新下。继续下一段逻辑
// 能到这里,证明前面拿不到锁,但是也要做可重入锁的处理
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
这是可重入锁的处理,继续下一段
// 时间队列中有值,证明线程已经在队列中,不需要往后执行逻辑了
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread
// in the queue, but this is approximately correct, and
// avoids having to traverse the queue
// 因为下面的timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
// 所以这里的ttl = timeout - tonumber(ARGV[3]) - tonumber(ARGV[4])
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
举例子:线程1持有锁,线程2尝试第一次获取锁(不进入这段if),线程2第二次获取锁(进入了这段if)。继续下一段
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
// 如果最后一个线程不是当前线程,则从时间集合取出(举例:线程1/2/3按顺序获取锁,此时pttl得到的是线程1的锁过期时间,zscore拿到的是线程2的锁的过期时间,此时线程3应该以线程2的为准)
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
// 否则直接获取锁的存活时间
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
// 过期时间 = 锁存活时间 + 等待时间 + 当前时间戳
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
// 如果添加到时间集合成功,则同时添加线程集合
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
ttl 这段的获取逻辑,翻译翻译就是,如果前面有人排队,就以前面的超时时间为准,如果没人排队,就拿锁的超时时间。获取到 ttl ,就对添加到线程集合和时间集合。
来源:https://www.cnblogs.com/konghuanxi/p/16869998.html


猜你喜欢
- 一、概述热修复这项技术,基本上已经成为项目比较重要的模块了。主要因为项目在上线之后,都难免会有各种问题,而依靠发版去修复问题,成本太高了。现
- Tomcat 如何实现WebSocketWebSocket协议属于HTML5标准,越来越多浏览器已经原生支持WebSocket,它能让客户端
- 本文实例讲述了Android DigitalClock组件用法。分享给大家供大家参考,具体如下:DigitalClock组件的使用很简单,先
- 前言static表示“全局”或者“静态”的意思,用来修饰成员变量和成员方法,也可以形成静态static代码块,但是Java语言中没有全局变量
- 在Springboot中默认的静态资源路径有:classpath:/METAINF/resources/,classpath:/resour
- 首先对Servlet上传文件的简单理解此前,Servlet本身没有对文件上传提供直接的支持,一般需要使用第三方框架来实现,这样就比较麻烦不过
- 特别是针对循环或timer处理中需要在窗体控件显示数据时,因后台处理过度繁忙而出现没刷新或者假死现象时,可以使用Application.Do
- HTTPS是HTTP的安全版本,旨在提供数据传输层安全性(TLS)。当你的应用不使用HTTP协议的时候,浏览器地址栏就会出现一个不安全的提示
- 前言对于数组遍历,基本上每个开发者都写过,遍历本身没什么好说的,但是当我们在遍历的过程中,有一些复杂的业务逻辑时,将会发现代码的层级会逐渐加
- 自动去除图像扫描黑边/// <summary>  
- 认识数组数组的定义数组是相同类型数据的有序集合。数组描述的是相同类型的若干个数据,按照一定的先后次序排列组合而成。其中,每一个数据称作一个元
- 1.设置url-pattern为*.do(最为常见的方式)只要你的请求url中包含配置的url-pattern,该url就可以到达Dispa
- Java语言的垃圾回收1.垃圾回收机制的基本概念问:1.什么是Java垃圾回收?答:在Java语言的生命周期中,Java运行环境提供了一个系
- 在Java中,泛型的引入是为了在编译时提供强类型检查和支持泛型编程。为了实现泛型,Java编译器应用类型擦除实现: &
- ThreadLocal,直译为“线程本地”或“本地线程”,如果你真的这么认为,那就错了!其实,它就是一个容器,用于存放线程的局部变量,我认为
- @RequestBody不能class类型匹配在首次第一次尝试使用@RequestBody注解开始加载字符串使用post提交(貌似只能pos
- 因为公司现在换成了nacos,所以自己写了demo学习一下。结果第一步就走不下去。在使用nacos-config读取nacos配置时。发现b
- 熟悉Eclipse的都知道Eclipse经常性的会出现一些莫名其妙的问题,有时候运行的好好的突然重启一下项目就莫名的报错,所以经常会用到cl
- 1.引言在开发过程中,我们经常会遇到需要显示或隐藏View视图的情况,如果在隐藏或显示View的过程中加上动画,能让交互更加的友好和动感,本
- 一、说明标准库提供了许多容器,它们有一个共同点:它们是同类的。也就是说,标准库中的容器只能存储一种类型的元素。 std::vector<