浅谈Java(SpringBoot)基于zookeeper的分布式锁实现
作者:LJY_SUPER 发布时间:2023-11-16 08:14:56
通过zookeeper实现分布式锁
1、创建zookeeper的client
首先通过CuratorFrameworkFactory创建一个连接zookeeper的连接CuratorFramework client
public class CuratorFactoryBean implements FactoryBean<CuratorFramework>, InitializingBean, DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ContractFileInfoController.class);
private String connectionString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private RetryPolicy retryPolicy;
private CuratorFramework client;
public CuratorFactoryBean(String connectionString) {
this(connectionString, 500, 500);
}
public CuratorFactoryBean(String connectionString, int sessionTimeoutMs, int connectionTimeoutMs) {
this.connectionString = connectionString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
}
@Override
public void destroy() throws Exception {
LOGGER.info("Closing curator framework...");
this.client.close();
LOGGER.info("Closed curator framework.");
}
@Override
public CuratorFramework getObject() throws Exception {
return this.client;
}
@Override
public Class<?> getObjectType() {
return this.client != null ? this.client.getClass() : CuratorFramework.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
if (StringUtils.isEmpty(this.connectionString)) {
throw new IllegalStateException("connectionString can not be empty.");
} else {
if (this.retryPolicy == null) {
this.retryPolicy = new ExponentialBackoffRetry(1000, 2147483647, 180000);
}
this.client = CuratorFrameworkFactory.newClient(this.connectionString, this.sessionTimeoutMs, this.connectionTimeoutMs, this.retryPolicy);
this.client.start();
this.client.blockUntilConnected(30, TimeUnit.MILLISECONDS);
}
}
public void setConnectionString(String connectionString) {
this.connectionString = connectionString;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
public void setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
public void setClient(CuratorFramework client) {
this.client = client;
}
}
2、封装分布式锁
根据CuratorFramework创建InterProcessMutex(分布式可重入排它锁)对一行数据进行上锁
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, TimeUnit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
public boolean acquire(long time, TimeUnit unit) throws Exception {
return this.internalLock(time, unit);
}
释放锁 mutex.release();
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount <= 0) {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
} else {
try {
this.internals.releaseLock(lockData.lockPath);
} finally {
this.threadData.remove(currentThread);
}
}
}
}
}
封装后的DLock代码
1、调用InterProcessMutex processMutex = dLock.mutex(path);
2、手动释放锁processMutex.release();
3、需要手动删除路径dLock.del(path);
推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public T mutex(String path, ZkLockCallback zkLockCallback, long time, TimeUnit timeUnit)
2、这个无返回结果
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit)
public class DLock {
private final Logger logger;
private static final long TIMEOUT_D = 100L;
private static final String ROOT_PATH_D = "/dLock";
private String lockRootPath;
private CuratorFramework client;
public DLock(CuratorFramework client) {
this("/dLock", client);
}
public DLock(String lockRootPath, CuratorFramework client) {
this.logger = LoggerFactory.getLogger(DLock.class);
this.lockRootPath = lockRootPath;
this.client = client;
}
public InterProcessMutex mutex(String path) {
if (!StringUtils.startsWith(path, "/")) {
path = Constant.keyBuilder(new Object[]{"/", path});
}
return new InterProcessMutex(this.client, Constant.keyBuilder(new Object[]{this.lockRootPath, "", path}));
}
public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback) throws ZkLockException {
return this.mutex(path, zkLockCallback, 100L, TimeUnit.MILLISECONDS);
}
public <T> T mutex(String path, ZkLockCallback<T> zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
String finalPath = this.getLockPath(path);
InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);
try {
if (!mutex.acquire(time, timeUnit)) {
throw new ZkLockException("acquire zk lock return false");
}
} catch (Exception var13) {
throw new ZkLockException("acquire zk lock failed.", var13);
}
T var8;
try {
var8 = zkLockCallback.doInLock();
} finally {
this.releaseLock(finalPath, mutex);
}
return var8;
}
private void releaseLock(String finalPath, InterProcessMutex mutex) {
try {
mutex.release();
this.logger.info("delete zk node path:{}", finalPath);
this.deleteInternal(finalPath);
} catch (Exception var4) {
this.logger.error("dlock", "release lock failed, path:{}", finalPath, var4);
// LogUtil.error(this.logger, "dlock", "release lock failed, path:{}", new Object[]{finalPath, var4});
}
}
public void mutex(String path, ZkVoidCallBack zkLockCallback, long time, TimeUnit timeUnit) throws ZkLockException {
String finalPath = this.getLockPath(path);
InterProcessMutex mutex = new InterProcessMutex(this.client, finalPath);
try {
if (!mutex.acquire(time, timeUnit)) {
throw new ZkLockException("acquire zk lock return false");
}
} catch (Exception var13) {
throw new ZkLockException("acquire zk lock failed.", var13);
}
try {
zkLockCallback.response();
} finally {
this.releaseLock(finalPath, mutex);
}
}
public String getLockPath(String customPath) {
if (!StringUtils.startsWith(customPath, "/")) {
customPath = Constant.keyBuilder(new Object[]{"/", customPath});
}
String finalPath = Constant.keyBuilder(new Object[]{this.lockRootPath, "", customPath});
return finalPath;
}
private void deleteInternal(String finalPath) {
try {
((ErrorListenerPathable)this.client.delete().inBackground()).forPath(finalPath);
} catch (Exception var3) {
this.logger.info("delete zk node path:{} failed", finalPath);
}
}
public void del(String customPath) {
String lockPath = "";
try {
lockPath = this.getLockPath(customPath);
((ErrorListenerPathable)this.client.delete().inBackground()).forPath(lockPath);
} catch (Exception var4) {
this.logger.info("delete zk node path:{} failed", lockPath);
}
}
}
@FunctionalInterface
public interface ZkLockCallback<T> {
T doInLock();
}
@FunctionalInterface
public interface ZkVoidCallBack {
void response();
}
public class ZkLockException extends Exception {
public ZkLockException() {
}
public ZkLockException(String message) {
super(message);
}
public ZkLockException(String message, Throwable cause) {
super(message, cause);
}
}
配置CuratorConfig
@Configuration
public class CuratorConfig {
@Value("${zk.connectionString}")
private String connectionString;
@Value("${zk.sessionTimeoutMs:500}")
private int sessionTimeoutMs;
@Value("${zk.connectionTimeoutMs:500}")
private int connectionTimeoutMs;
@Value("${zk.dLockRoot:/dLock}")
private String dLockRoot;
@Bean
public CuratorFactoryBean curatorFactoryBean() {
return new CuratorFactoryBean(connectionString, sessionTimeoutMs, connectionTimeoutMs);
}
@Bean
@Autowired
public DLock dLock(CuratorFramework client) {
return new DLock(dLockRoot, client);
}
}
测试代码
@RestController
@RequestMapping("/dLock")
public class LockController {
@Autowired
private DLock dLock;
@RequestMapping("/lock")
public Map testDLock(String no){
final String path = Constant.keyBuilder("/test/no/", no);
Long mutex=0l;
try {
System.out.println("在拿锁:"+path+System.currentTimeMillis());
mutex = dLock.mutex(path, () -> {
try {
System.out.println("拿到锁了" + System.currentTimeMillis());
Thread.sleep(10000);
System.out.println("操作完成了" + System.currentTimeMillis());
} finally {
return System.currentTimeMillis();
}
}, 1000, TimeUnit.MILLISECONDS);
} catch (ZkLockException e) {
System.out.println("拿不到锁呀"+System.currentTimeMillis());
}
return Collections.singletonMap("ret",mutex);
}
@RequestMapping("/dlock")
public Map testDLock1(String no){
final String path = Constant.keyBuilder("/test/no/", no);
Long mutex=0l;
try {
System.out.println("在拿锁:"+path+System.currentTimeMillis());
InterProcessMutex processMutex = dLock.mutex(path);
processMutex.acquire();
System.out.println("拿到锁了" + System.currentTimeMillis());
Thread.sleep(10000);
processMutex.release();
System.out.println("操作完成了" + System.currentTimeMillis());
} catch (ZkLockException e) {
System.out.println("拿不到锁呀"+System.currentTimeMillis());
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
}
return Collections.singletonMap("ret",mutex);
}
@RequestMapping("/del")
public Map delDLock(String no){
final String path = Constant.keyBuilder("/test/no/", no);
dLock.del(path);
return Collections.singletonMap("ret",1);
}
}
以上所述是小编给大家介绍的Java(SpringBoot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助.
来源:https://blog.csdn.net/LJY_SUPER/article/details/87807091


猜你喜欢
- AndroidStudio 的SVN 安装和使用方法与我以前用的其他IDE 都有很大差别,感觉特麻烦,网上相关资料很少,貌似现在 Git 比
- Guava Cache:⾕歌开源缓存框架Guava Cache是在内存中缓存数据,相比较于数据库或redis存储,访问内存中的数据会更加高效
- 在javaweb中写了一个图片的链接,可以打开预览,另外提供一个下载功能。以下是预览代码,没什么好说的;href若连接的是一个压缩包文件之类
- 本文实例为大家分享了Java实现Flappy Bird游戏的具体代码,供大家参考,具体内容如下1.首先在mainActivity.xml中放
- 本文实例为大家分享了java实现滑动验证解锁的具体代码,供大家参考,具体内容如下1.html:<div class="dra
- 🍊一. 为什么需要线程通信线程是并发并行的执行,表现出来是线程随机执行,但是我们在实际应用中对线程的执行顺序是有要求的,这就需要用到线程通信
- 本文实例为大家分享了Java实现打字游戏的具体代码,供大家参考,具体内容如下新建一个项目,然后在src里面建一个MyGame.java文件,
- dll的编写,首先是打开VS新建一个C++的控制台程序,下一步后选择dll以及空文档即可。然后就是添加一个类添加一个方法。方法排头固定格式
- 简介最近花了两天时间研究使用Flutter开发一个抖音国际版. 个人感觉使用Flutter开发app快得不要不要的额. 两天就基本可以开发个
- 这篇文章主要介绍了JDK线程池和Spring线程池的使用实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值
- 本文实例为大家分享了C++实现哈夫曼树的编码解码,供大家参考,具体内容如下代码:#pragma once#include<iostre
- 前言:在本文中,我们将详细讨论Java中的一个核心概念——线程的生命周期。我们将使用一个快速的图解,
- 在spring中有很多以XXXAware命名的接口,很多人也不清楚这些接口都是做什么用的,这篇文章将描述常用的一些接口。一,Applicat
- 本文实例讲述了C#面向对象编程之猜拳游戏实现方法。分享给大家供大家参考。具体实现方法如下:1.需求现在要制作一个游戏,玩家与计算机进行猜拳游
- 在生产型Android客户端软件(企业级应用)开发中,界面可能存在多个输入(EditText)和多个操作(MotionEvent和KeyEv
- 基本要点1、定义根据百度百科的定义,RESTFUL是一种网络应用程序的设计风格和开发方式2、传统方式与Restful风格的区别在我们学习re
- 一、语音聊天说专业点就是即时语音,是一种基于网络的快速传递语音信息的技术,普遍应用于各类社交软件中,优势主要有以下几点:(1)时效性:视频直
- 摘要: 如何解决页面之间跳转时的黑屏问题呢?在默认情况下,Android应用程序启动时,会有一个黑屏的时期。原因是,首个activity会加
- 之前写过3篇手势密码的demo,不过没有集成到真实的企业项目中,这几天正好领到一个手势密码项目,昨天刚好弄完,今天抽空整理下,目前还没有完善
- 本文实例讲述了JDBC使用游标实现分页查询的方法。分享给大家供大家参考,具体如下:/*** 一次只从数据库中查询最大maxCount条记录*