SpringBoot整合Redisson实现分布式锁
作者:fastjson_ 发布时间:2021-08-01 12:04:35
Redisson是架设在redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
Github地址:https://github.com/redisson/redisson
一、添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- springboot整合redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- springboot整合redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
</dependencies>
二、redis配置文件
server:
port: 8000
spring:
redis:
host: localhost
port: 6379
password: null
database: 1
timeout: 30000
三、新建配置类
@Configuration
public class MyRedissonConfig {
@Value("${spring.redis.host}")
String redisHost;
@Value("${spring.redis.port}")
String redisPort;
@Value("${spring.redis.password}")
String redisPassword;
@Value("${spring.redis.timeout}")
Integer redisTimeout;
/**
* Redisson配置
* @return
*/
@Bean
RedissonClient redissonClient() {
//1、创建配置
Config config = new Config();
redisHost = redisHost.startsWith("redis://") ? redisHost : "redis://" + redisHost;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(redisHost + ":" + redisPort)
.setTimeout(redisTimeout);
if (StringUtils.isNotBlank(redisPassword)) {
serverConfig.setPassword(redisPassword);
}
return Redisson.create(config);
}
}
//单机
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
//主从
Config config = new Config();
config.useMasterSlaveServers()
.setMasterAddress("127.0.0.1:6379")
.addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
.addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
//哨兵
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
.addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
.addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
//集群
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // cluster state scan interval in milliseconds
.addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
.addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
四、使用分布式锁
可重入锁
基于Redis的Redisson分布式可重入锁RLock对象实现了java.util.concurrent.locks.Lock接口。
@RequestMapping("/redisson")
public String testRedisson(){
//获取分布式锁,只要锁的名字一样,就是同一把锁
RLock lock = redissonClient.getLock("lock");
//加锁(阻塞等待),默认过期时间是无限期
lock.lock();
try{
//如果业务执行过长,Redisson会自动给锁续期
Thread.sleep(1000);
System.out.println("加锁成功,执行业务逻辑");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//解锁,如果业务执行完成,就不会继续续期
lock.unlock();
}
return "Hello Redisson!";
}
如果拿到分布式锁的节点宕机,且这个锁正好处于锁住的状态时,会出现锁死的状态,为了避免这种情况的发生,锁都会设置一个过期时间。这样也存在一个问题,一个线程拿到了锁设置了30s超时,在30s后这个线程还没有执行完毕,锁超时释放了,就会导致问题,Redisson给出了自己的答案,就是 watch dog 自动延期机制。
Redisson提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期,也就是说,如果一个拿到锁的线程一直没有完成逻辑,那么看门狗会帮助线程不断的延长锁超时时间,锁不会因为超时而被释放。
默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson 还提供了可以指定leaseTime参数的加锁方法来指定加锁的时间。超过这个时间后锁便自动解开了,不会延长锁的有效期。
在RedissonLock类的renewExpiration()方法中,会启动一个定时任务每隔30/3=10秒给锁续期。如果业务执行期间,应用挂了,那么不会自动续期,到过期时间之后,锁会自动释放。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
另外Redisson还提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
如果指定了锁的超时时间,底层直接调用lua脚本,进行占锁。如果超过leaseTime,业务逻辑还没有执行完成,则直接释放锁,所以在指定leaseTime时,要让leaseTime大于业务执行时间。RedissonLock类的tryLockInnerAsync()方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
读写锁
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。在读写锁中,读读共享、读写互斥、写写互斥。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
读写锁测试类,当访问write接口时,read接口会被阻塞住。
@RestController
public class TestController {
@Autowired
RedissonClient redissonClient;
@Autowired
StringRedisTemplate redisTemplate;
@RequestMapping("/write")
public String write(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
RLock writeLock = readWriteLock.writeLock();
String s = UUID.randomUUID().toString();
writeLock.lock();
try {
redisTemplate.opsForValue().set("wr-lock-key", s);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
writeLock.unlock();
}
return s;
}
@RequestMapping("/read")
public String read(){
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("wr-lock");
RLock readLock = readWriteLock.readLock();
String s = "";
readLock.lock();
try {
s = redisTemplate.opsForValue().get("wr-lock-key");
} finally {
readLock.unlock();
}
return s;
}
}
信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
类似的接口和用法
关于信号量的使用你们能够想象一下这个场景,有三个停车位,当三个停车位满了后,其余车就不停了。能够把车位比做信号,如今有三个信号,停一次车,用掉一个信号,车离开就是释放一个信号。
咱们用 Redisson 来演示上述停车位的场景。
先定义一个占用停车位的方法:
/**
* 停车,占用停车位
* 总共 3 个车位
*/
@ResponseBody
@RequestMapping("park")
public String park() throws InterruptedException {
// 获取信号量(停车场)
RSemaphore park = redisson.getSemaphore("park");
// 获取一个信号(停车位)
park.acquire();
return "OK";
}
再定义一个离开车位的方法:
/**
* 释放车位
* 总共 3 个车位
*/
@ResponseBody
@RequestMapping("leave")
public String leave() throws InterruptedException {
// 获取信号量(停车场)
RSemaphore park = redisson.getSemaphore("park");
// 释放一个信号(停车位)
park.release();
return "OK";
}
为了简便,我用 Redis 客户端添加了一个 key:“park”,值等于 3,表明信号量为 park,总共有三个值。
而后用 postman 发送 park 请求占用一个停车位。
而后在 redis 客户端查看 park 的值,发现已经改成 2 了。继续调用两次,发现 park 的等于 0,当调用第四次的时候,会发现请求一直处于等待中
,说明车位不够了。若是想要不阻塞,能够用 tryAcquire 或 tryAcquireAsync。
咱们再调用离开车位的方法,park 的值变为了 1,表明车位剩余 1 个。
注意:屡次执行释放信号量操做,剩余信号量会一直增长,而不是到 3 后就封顶了。
闭锁(CountDownLatch)
CountDownLatch作用:某一线程,等待其他线程执行完毕之后,自己再继续执行。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
在TestController中添加测试方法,访问close接口时,调用await()方法进入阻塞状态,直到有三次访问release接口时,close接口才会返回。
@RequestMapping("/close")
public String close() throws InterruptedException {
RCountDownLatch close = redissonClient.getCountDownLatch("close");
close.trySetCount(3);
close.await();
return "close";
}
@RequestMapping("/release")
public String release(){
RCountDownLatch close = redissonClient.getCountDownLatch("close");
close.countDown();
return "release";
}
来源:https://blog.csdn.net/bbj12345678/article/details/121144955
猜你喜欢
- 前言本文主要给大家介绍了关于Java读取二进制文件的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。读Hex写CS
- synchronized关键字synchronized,我们谓之锁,主要用来给方法、代码块加锁。当某个方法或者代码块使用synchroniz
- 简介我们知道在native的代码中有很多指针,这些指针在JNA中被映射成为Pointer。除了Pointer之外,JNA还提供了更加强大的M
- 异常处理机制1、抛出异常2、捕获异常3、异常处理五个关键字:try、catch、finally、throw、throws注意:假设要捕获多个
- 接触过Android开发的同学们都知道在Android中访问程序资源基本都是通过资源ID来访问。这样开发起来很简单,并且可以不去考虑各种分辨
- Java事件处理机制和适配器最重要的是理解事件源,监视器,处理事件的接口的概念。1.事件源:是能够产生时间的对象都可以叫事件源,比如文本框,
- Java中的wait/notify/notifyAll可用来实现线程间通信,是Object类的方法,这三个方法都是native方法,是平台相
- 这篇文章主要介绍了Java内存模型可见性问题相关解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友
- filter自定义过滤器 增加了 对验证码的校验package com.youxiong.filter;import com.y
- 本文实例讲述了Java矩阵连乘问题(动态规划)算法。分享给大家供大家参考,具体如下:问题描述:给定n个矩阵:A1,A2,...,An,其中A
- JWT可以理解为一个加密的字符串,里面由三部分组成:头部(Header)、负载(Payload)、签名(signature)由base64加
- 前言反射和注解在java中偏高级用法,一般在各种框架中被广泛应用,文章简单介绍下反射和注解的用法,希望对你的工作学习有一定帮助java注解什
- 本文实例为大家分享了Java实现五子棋网络版的具体代码,供大家参考,具体内容如下需求分析:对于网络五子棋而言,在普通五子棋的基础上需要添加以
- 前言我们在很多博客中都有发现,Seata AT模式里面的全局锁其实是行锁,这也是Seata AT模式和XA模式在锁粒度上的最大区别。我们可以
- 前言在RequestMappingHandlerAdapter对request进行了适配,并且调用了目标handler之后,其会返回一个Mo
- 在 C# 9 中,foreach 循环可以使用扩展方法。在本文中,我们将通过例子回顾 C# 9 中如何扩展 foreach 循环。代码演示下
- 一、Java类加载机制1.概述 Class文件由类装载器装
- 首先,必须要强调的一点,MD5不是加密算法,而是消息摘要算法,具有不可逆性。字符串通过MD5处理后会生成128位的二进制串。我们通常会将其转
- 背景在 Java 中实现线程安全的传统方式是 synchronized 关键字,虽然它提供了一定的同步能力,但它在使用上
- 目录Web服务器技术讲解PHP:JSP/ServletWeb服务器IISTomcatJAVA jdk中的内容TomcatTomcat根目录下