redisson实现分布式锁原理
作者:min.jiang 发布时间:2023-11-29 00:00:00
Redisson分布式锁
之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的。
不同版本实现锁的机制并不相同
引用的redisson最近发布的版本3.2.3,不同的版本可能实现锁的机制并不相同,早期版本好像是采用简单的setnx,getset等常规命令来配置完成,而后期由于redis支持了脚本Lua变更了实现原理。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.2.3</version>
</dependency>
setnx需要配合getset以及事务来完成,这样才能比较好的避免死锁问题,而新版本由于支持lua脚本,可以避免使用事务以及操作多个redis命令,语义表达更加清晰一些。
RLock接口的特点
继承标准接口Lock
拥有标准锁接口的所有特性,比如lock,unlock,trylock等等。
扩展标准接口Lock
扩展了很多方法,常用的主要有:强制锁释放,带有效期的锁,还有一组异步的方法。其中前面两个方法主要是解决标准lock可能造成的死锁问题。比如某个线程获取到锁之后,线程所在机器死机,此时获取了锁的线程无法正常释放锁导致其余的等待锁的线程一直等待下去。
可重入机制
各版本实现有差异,可重入主要考虑的是性能,同一线程在未释放锁时如果再次申请锁资源不需要走申请流程,只需要将已经获取的锁继续返回并且记录上已经重入的次数即可,与jdk里面的ReentrantLock功能类似。重入次数靠hincrby命令来配合使用,详细的参数下面的代码。
怎么判断是同一线程?
redisson的方案是,RedissonLock实例的一个guid再加当前线程的id,通过getLockName返回
public class RedissonLock extends RedissonExpirable implements RLock {
final UUID id;
protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);
this.commandExecutor = commandExecutor;
this.id = id;
}
String getLockName(long threadId) {
return this.id + ":" + threadId;
}
RLock获取锁的两种场景
这里拿tryLock的源码来看:tryAcquire方法是申请锁并返回锁有效期还剩余的时间,如果为空说明锁未被其它线程申请直接获取并返回,如果获取到时间,则进入等待竞争逻辑。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit);
if(ttl == null) {
//直接获取到锁
return true;
} else {
//有竞争的后续看
}
}
无竞争,直接获取锁
先看下首先获取锁并释放锁背后的redis都在做什么,可以利用redis的monitor来在后台监控redis的执行情况。当我们在方法了增加@RequestLockable之后,其实就是调用lock以及unlock,下面是redis命令:
加锁
由于高版本的redis支持lua脚本,所以redisson也对其进行了支持,采用了脚本模式,不熟悉lua脚本的可以去查找下。执行lua命令的逻辑如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)});
}
加锁的流程:
判断lock键是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回nil,告诉客户端直接获取到锁。
判断lock键是否存在,存在则将重入次数加1,并重新设置过期时间,返回nil,告诉客户端直接获取到锁。
被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。
"EVAL"
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
return redis.call('pttl', KEYS[1]);"
"1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
上面的lua脚本会转换成真正的redis命令,下面的是经过lua脚本运算之后实际执行的redis命令。
1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"346e1eb8-5bfd-4d49-9870-042df402f248:21" "1"
1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"
解锁
解锁的流程看起来复杂些:
如果lock键不存在,发消息说锁已经可用
如果锁不是被当前线程锁定,则返回nil
由于支持可重入,在解锁时将重入次数需要减1
如果计算后的重入次数>0,则重新设置过期时间
如果计算后的重入次数<=0,则发消息说锁已经可用
"EVAL"
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
return nil;"
"2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}"
"0" "1000"
"346e1eb8-5bfd-4d49-9870-042df402f248:21"
无竞争情况下解锁redis命令:
主要是发送一个解锁的消息,以此唤醒等待队列中的线程重新竞争锁。
1486642678.493691 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642678.493712 [0 lua] "publish" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0"
有竞争,等待
有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令,复杂的在redisson的源码上。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中。
this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
this.await返回true,进入循环尝试获取锁。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit);
if(ttl == null) {
return true;
} else {
//重点是这段
time -= System.currentTimeMillis() - current;
if(time <= 0L) {
return false;
} else {
current = System.currentTimeMillis();
final RFuture subscribeFuture = this.subscribe(threadId);
if(!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if(!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener() {
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if(subscribeFuture.isSuccess()) {
RedissonLock.this.unsubscribe(subscribeFuture, threadId);
}
}
});
}
return false;
} else {
boolean var16;
try {
time -= System.currentTimeMillis() - current;
if(time <= 0L) {
boolean currentTime1 = false;
return currentTime1;
}
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(leaseTime, unit);
if(ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if(time <= 0L) {
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if(ttl.longValue() >= 0L && ttl.longValue() < time) {
this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
var16 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var16;
}
}
}
}
循环尝试一般有如下几种方法:
while循环,一次接着一次的尝试,这个方法的缺点是会造成大量无效的锁申请。
Thread.sleep,在上面的while方案中增加睡眠时间以降低锁申请次数,缺点是这个睡眠的时间设置比较难控制。
基于信息量,当锁被其它资源占用时,当前线程订阅锁的释放事件,一旦锁释放会发消息通知待等待的锁进行竞争,有效的解决了无效的锁申请情况。核心逻辑是this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch()返回的是一个信号量,有兴趣可以再研究研究。
redisson依赖
由于redisson不光是针对锁,提供了很多客户端操作redis的方法,所以会依赖一些其它的框架,比如netty,如果只是简单的使用锁也可以自己去实现。
来源:http://www.cnblogs.com/ASPNET2008/p/6385249.html


猜你喜欢
- class文件中的attributes_count和attributesattributes_count位于class文件中methods的
- 目录了解程序集如何在C#.NET中加载程序集,模块和引用.NET中的程序集绑定绑定重定向当问题开始发生时故障排除边注References了解
- Spring cloud网关gateway进行websocket路由转发规则配置一、websocket及http路由转发规则配置后端是普通的
- 研究了下这个,记录下代码。主页面代码:activity_main.xml<?xml version="1.0" e
- 在本人用editplus写java文件时碰到的问题。 import java.util.*;class collection{ &
- ElGamal数字签名,供大家参考,具体内容如下一、实验目的学习ElGamal算法在数字签名方面的使用,掌握教科书版本的ElGamal数字签
- 序列化序列化:将对象转换为二进制序列在网络中传输或保存到磁盘反序列化:从网络或磁盘中将二进制序列转换为对象注意:对象必须实现Serializ
- 前言这篇文章主要给大家介绍了关于C#导出pdf的实现方法,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧方法如下:一.接口部
- 程序如下:View Code /* * Hanoi塔游戏 问题描述: * 汉诺塔:汉诺塔(又称河内塔)问
- Android独有的安全机制,除了权限机制外,另外一个就是签名机制了。签名机制主要用在以下两个主要场合起到其作用:升级App和权限检查。升级
- 前言写Android:如何编写“万能”的Activity的这篇文章到现在已经好久了,但是由于最近事情较多,写重构篇的计划就一直被无情的耽搁下
- 本文实例讲述了C#封装的常用文件操作类。分享给大家供大家参考。具体如下:这个C#类封装了我们经常能用到的文件操作方法,包括读写文件、获取文件
- 1,实现方法一:通过给当前界面布局文件的父layout设置点击事件(相当于给整个Activity设置点击事件),在事件里进行键盘隐藏<
- 在前台请求数据的时候,sql语句一直都是打印到控制台的,有一个想法就是想让它打印到日志里,该如何做呢?见下面的mybatis配置文件:<
- 通过上一篇的分析,我们知道了独占模式获取锁有三种方式,分别是不响应线程中断获取,响应线程中断获取,设置超时时间获取。在共享模式下获取锁的方式
- 一、类成员的访问级别public:可由任何代码访问。private(默认):只能由类中的代码访问。internal:只能由它所在的项目(程序
- 1、java 的下载和安装一、安装JDKjava下载网址 或者 点击这里根据自己操作系统和系统位数下载相应的JDK安装
- 在[高并发Java 二] 多线程基础中,我们已经初步提到了基本的线程同步操作。这次要提到的是在并发包中的同步控制工具。1. 各种同步控制工具
- 一、排序与去重日常工作中,总会有一些场景需要对结果集进行一些过滤。比如,与第三方交互后获取的结果集,需要再次排序去重,此时就会根据某个字段来
- JSONArray删除元素的两种方式我自个磨出来的,难受JSONArray jsonarray = new JSONArray();Set&