Java 根据某个 key 加锁的实现方式
作者:明明如月学长 发布时间:2023-10-27 07:54:20
一、背景
日常开发中,有时候需要根据某个 key 加锁,确保多线程情况下,对该 key 的加锁和解锁之间的代码串行执行。
大家可以借助每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁;每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。
二、参考代码
接口定义
public interface LockByKey<T> {
/**
* 加锁
*/
void lock(T key);
/**
* 解锁
*/
void unlock(T key);
}
2.1 同一个 key 只能一个线程执行
2.1.1 代码实现
每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class DefaultLockByKeyImpl<T> implements LockByKey<T> {
private final Map<T, ReentrantLock> lockMap = new ConcurrentHashMap<>();
/**
* 加锁
*/
@Override
public void lock(T key) {
// 如果key为空,直接返回
if (key == null) {
throw new IllegalArgumentException("key 不能为空");
}
// 获取或创建一个ReentrantLock对象
ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
// 获取锁
lock.lock();
}
/**
* 解锁
*/
@Override
public void unlock(T key) {
// 如果key为空,直接返回
if (key == null) {
throw new IllegalArgumentException("key 不能为空");
}
// 从Map中获取锁对象
ReentrantLock lock = lockMap.get(key);
// 获取不到报错
if (lock == null) {
throw new IllegalArgumentException("key " + key + "尚未加锁");
}
// 其他线程非法持有不允许释放
if (!lock.isHeldByCurrentThread()) {
throw new IllegalStateException("当前线程尚未持有,key:" + key + "的锁,不允许释放");
}
lock.unlock();
}
}
注意事项:
(1)参数合法性校验
(2)解锁时需要判断该锁是否为当前线程持有
2.1.2 编写单测
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class DefaultLockByKeyImplTest {
private final LockByKey<String> lockByKey = new DefaultLockByKeyImpl<>();
private final CountDownLatch countDownLatch = new CountDownLatch(7);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
public void test() throws InterruptedException {
List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
Set<String> executingKeySet = new HashSet<>();
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
int finalI = i;
executorService.submit(() -> {
lockByKey.lock(key);
if (executingKeySet.contains(key)) {
throw new RuntimeException("存在正在执行的 key:" + key);
}
executingKeySet.add(key);
try {
System.out.println("index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName());
lockByKey.unlock(key);
executingKeySet.remove(key);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
}
如果同一个 key 没释放能够再次进入,会抛出异常。
也可以通过日志来观察执行情况:
index:0对 [a] 加锁 ->pool-1-thread-1
index:6对 [d] 加锁 ->pool-1-thread-7
index:4对 [c] 加锁 ->pool-1-thread-5
index:3对 [b] 加锁 ->pool-1-thread-4
index:6释放 [d] ->pool-1-thread-7
index:4释放 [c] ->pool-1-thread-5
index:0释放 [a] ->pool-1-thread-1
index:3释放 [b] ->pool-1-thread-4
index:1对 [a] 加锁 ->pool-1-thread-2
index:5对 [b] 加锁 ->pool-1-thread-6
index:1释放 [a] ->pool-1-thread-2
index:5释放 [b] ->pool-1-thread-6
index:2对 [a] 加锁 ->pool-1-thread-3
index:2释放 [a] ->pool-1-thread-3
2.2、同一个 key 可以有 n个线程执行
2.2.1 代码实现
每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。
import lombok.SneakyThrows;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
public class SimultaneousEntriesLockByKey<T> implements LockByKey<T> {
private final Map<T, Semaphore> semaphores = new ConcurrentHashMap<>();
/**
* 最大线程
*/
private int allowed_threads;
public SimultaneousEntriesLockByKey(int allowed_threads) {
this.allowed_threads = allowed_threads;
}
/**
* 加锁
*/
@Override
public void lock(T key) {
Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v);
semaphore.acquireUninterruptibly();
}
/**
* 解锁
*/
@Override
public void unlock(T key) {
// 如果key为空,直接返回
if (key == null) {
throw new IllegalArgumentException("key 不能为空");
}
// 从Map中获取锁对象
Semaphore semaphore = semaphores.get(key);
if (semaphore == null) {
throw new IllegalArgumentException("key " + key + "尚未加锁");
}
semaphore.release();
if (semaphore.availablePermits() >= allowed_threads) {
semaphores.remove(key, semaphore);
}
}
2.2.2 测试代码
import com.google.common.collect.Lists;
import org.junit.Test;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimultaneousEntriesLockByKeyTest {
private final int maxThreadEachKey = 2;
private final LockByKey<String> lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey);
private final CountDownLatch countDownLatch = new CountDownLatch(7);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
public void test() throws InterruptedException {
List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
Map<String, Integer> executingKeyCount = Collections.synchronizedMap(new HashMap<>());
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
int finalI = i;
executorService.submit(() -> {
lockByKey.lock(key);
executingKeyCount.compute(key, (k, v) -> {
if (v != null && v + 1 > maxThreadEachKey) {
throw new RuntimeException("超过限制了");
}
return v == null ? 1 : v + 1;
});
try {
System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1));
lockByKey.unlock(key);
executingKeyCount.compute(key, (k, v) -> v - 1);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
}
输出:
time:2023-03-15T20:49:57.044195 ,index:6对 [d] 加锁 ->pool-1-thread-7count:1
time:2023-03-15T20:49:57.058942 ,index:5对 [b] 加锁 ->pool-1-thread-6count:2
time:2023-03-15T20:49:57.069789 ,index:1对 [a] 加锁 ->pool-1-thread-2count:2
time:2023-03-15T20:49:57.042402 ,index:4对 [c] 加锁 ->pool-1-thread-5count:1
time:2023-03-15T20:49:57.046866 ,index:0对 [a] 加锁 ->pool-1-thread-1count:2
time:2023-03-15T20:49:57.042991 ,index:3对 [b] 加锁 ->pool-1-thread-4count:2
time:2023-03-15T20:49:58.089557 ,index:0释放 [a] ->pool-1-thread-1count:1
time:2023-03-15T20:49:58.082679 ,index:6释放 [d] ->pool-1-thread-7count:0
time:2023-03-15T20:49:58.084579 ,index:4释放 [c] ->pool-1-thread-5count:0
time:2023-03-15T20:49:58.083462 ,index:5释放 [b] ->pool-1-thread-6count:1
time:2023-03-15T20:49:58.089576 ,index:3释放 [b] ->pool-1-thread-4count:1
time:2023-03-15T20:49:58.085359 ,index:1释放 [a] ->pool-1-thread-2count:1
time:2023-03-15T20:49:58.096912 ,index:2对 [a] 加锁 ->pool-1-thread-3count:1
time:2023-03-15T20:49:59.099935 ,index:2释放 [a] ->pool-1-thread-3count:0
三、总结
本文结合自己的理解和一些参考代码,给出自己的示例,希望对大家有帮助。
来源:https://blog.csdn.net/w605283073/article/details/129568863


猜你喜欢
- 本文实例讲述了Android通过json向MySQL中写入数据的方法。分享给大家供大家参考,具体如下:先说一下如何通过json将Androi
- 在上篇文章给大家介绍了Mybatis中#{}和${}传参的区别及#和$的区别小结,如果大家有需要可以参考下。$和#简单说明:#相当于对数据
- 本文实例为大家分享了Android实现背景图滑动变大松开回弹的具体代码,供大家参考,具体内容如下原图放大后1、自定义view继承Scroll
- 在Activity之间传递数据还可以利用一些技巧,不管windows还是Linux操作系统,都会支持一种叫剪切板的技术,也就是某一个程序将一
- 本文实例为大家分享了Unity实现截图功能的具体代码,供大家参考,具体内容如下一、使用Unity自带APIusing UnityEngine
- 前言之前看到某公司的官网的文章的浏览量刷新一次网页就会增加一次,给人的感觉不太好,一个公司的官网给人如此直白的漏洞,我批量发起请求的时候发现
- 前言假如你做了一个云盘类的app,或者可以保存用户导入的配置。用户在未来肯定需要获取这些文件,一个办法是写一个Activity,向一个文件管
- 先来看看效果:一、添加依赖库的步骤1.项目的gradle文件内的做以下改动allprojects { repositories
- 因为目前工程无法使用第三方,只能搞一个 * 缓存了 * 缓存分为内存缓存,本地缓存,网络缓存;缓存的步骤依次是网络,内存,本地,然后取的顺序为内
- 标准c++中string类函数介绍注意不是CString之所以抛弃char*的字符串而选用C++标准程序库中的string类,是因为他和前者
- 在传统的单服务架构中,一般来说,只有一个服务器,那么不存在 Session共享问题,但是在分布式/集群项目中,Session 共享则是一个必
- Java爬取图片现在开始学习爬虫,对于爬虫的入门来说,图片相对来说是比较容易获取的,因为大部分图片都不是敏感数据,所以不会遇到什么反爬措施,
- springboot项目启动慢的问题排查springboot项目,随着时间的推移,启动耗时逐步增加,从几分钟慢慢的达到30多分钟,有点恐怖!
- springboot扩展MVC自定义 config -> SpringMvcConfig.java下边就是扩展springMVC的模板
- 本文基于GP58系列,它可以兼容ESC/POS指令集,对EPSON的打印机通用.Android下的设备调试,如果设备提供了驱动,按照厂家的驱
- 当一个列表项目很多,并且每个项目可以进入到其它Activity或者Fragment时,保存之前列表的位置是一个比较不错的功能,今天研究了一下
- 本文实例讲述了C# linq查询之动态OrderBy用法。分享给大家供大家参考。具体分析如下:groupList是原始数据集合,List&l
- 本文实例讲述了c#与js随机数生成方法。分享给大家供大家参考。具体如下:1. C#产生随机数方法:Random rd = new Rando
- 本文实例讲述了C#使用xsd文件验证XML格式是否正确的实现方法。分享给大家供大家参考,具体如下://创建xmlDocumentXmlDoc
- SpringBoot Data JPA实现 一对多、多对一关联表查询开发环境IDEA 2017.1Java1.8SpringBoot 2.0