软件编程
位置:首页>> 软件编程>> java编程>> Java 根据某个 key 加锁的实现方式

Java 根据某个 key 加锁的实现方式

作者:明明如月学长  发布时间:2023-10-27 07:54:20 

标签:Java,key,加锁

一、背景

日常开发中,有时候需要根据某个 key 加锁,确保多线程情况下,对该 key 的加锁和解锁之间的代码串行执行。
大家可以借助每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁;每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。

二、参考代码

接口定义

public interface LockByKey<T> {

/**
    * 加锁
    */
   void lock(T key);

/**
    * 解锁
    */
   void unlock(T key);
}

2.1 同一个 key 只能一个线程执行

2.1.1 代码实现

每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁。

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

public class DefaultLockByKeyImpl<T> implements LockByKey<T> {

private final Map<T, ReentrantLock> lockMap = new ConcurrentHashMap<>();

/**
    * 加锁
    */
   @Override
   public void lock(T key) {
       // 如果key为空,直接返回
       if (key == null) {
           throw new IllegalArgumentException("key 不能为空");
       }

// 获取或创建一个ReentrantLock对象
       ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
       // 获取锁
       lock.lock();
   }

/**
    * 解锁
    */
   @Override
   public void unlock(T key) {
       // 如果key为空,直接返回
       if (key == null) {
           throw new IllegalArgumentException("key 不能为空");
       }

// 从Map中获取锁对象
       ReentrantLock lock = lockMap.get(key);
       // 获取不到报错
       if (lock == null) {
           throw new IllegalArgumentException("key " + key + "尚未加锁");
       }
       // 其他线程非法持有不允许释放
       if (!lock.isHeldByCurrentThread()) {
           throw new IllegalStateException("当前线程尚未持有,key:" + key + "的锁,不允许释放");
       }
       lock.unlock();
   }
}

注意事项:
(1)参数合法性校验
(2)解锁时需要判断该锁是否为当前线程持有

2.1.2 编写单测

import com.google.common.collect.Lists;
import org.junit.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class DefaultLockByKeyImplTest {

private final LockByKey<String> lockByKey = new DefaultLockByKeyImpl<>();

private final CountDownLatch countDownLatch = new CountDownLatch(7);
   private final ExecutorService executorService = Executors.newFixedThreadPool(10);

@Test
   public void test() throws InterruptedException {
       List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
       Set<String> executingKeySet = new HashSet<>();

for (int i = 0; i < keys.size(); i++) {
           String key = keys.get(i);
           int finalI = i;
           executorService.submit(() -> {
               lockByKey.lock(key);
               if (executingKeySet.contains(key)) {
                   throw new RuntimeException("存在正在执行的 key:" + key);
               }
               executingKeySet.add(key);

try {
                   System.out.println("index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName());
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               } finally {
                   System.out.println("index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName());
                   lockByKey.unlock(key);
                   executingKeySet.remove(key);
                   countDownLatch.countDown();
               }
           });
       }
       countDownLatch.await();
   }
}

如果同一个 key 没释放能够再次进入,会抛出异常。
也可以通过日志来观察执行情况:

index:0对 [a] 加锁 ->pool-1-thread-1
index:6对 [d] 加锁 ->pool-1-thread-7
index:4对 [c] 加锁 ->pool-1-thread-5
index:3对 [b] 加锁 ->pool-1-thread-4
index:6释放 [d] ->pool-1-thread-7
index:4释放 [c] ->pool-1-thread-5
index:0释放 [a] ->pool-1-thread-1
index:3释放 [b] ->pool-1-thread-4

index:1对 [a] 加锁 ->pool-1-thread-2
index:5对 [b] 加锁 ->pool-1-thread-6
index:1释放 [a] ->pool-1-thread-2
index:5释放 [b] ->pool-1-thread-6

index:2对 [a] 加锁 ->pool-1-thread-3
index:2释放 [a] ->pool-1-thread-3

2.2、同一个 key 可以有 n个线程执行

2.2.1 代码实现

每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。

import lombok.SneakyThrows;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class SimultaneousEntriesLockByKey<T> implements LockByKey<T> {

private final Map<T, Semaphore> semaphores = new ConcurrentHashMap<>();

/**
    * 最大线程
    */
   private int allowed_threads;

public SimultaneousEntriesLockByKey(int allowed_threads) {
       this.allowed_threads = allowed_threads;
   }

/**
    * 加锁
    */
   @Override
   public void lock(T key) {
       Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v);
       semaphore.acquireUninterruptibly();
   }

/**
    * 解锁
    */
   @Override
   public void unlock(T key) {
       // 如果key为空,直接返回
       if (key == null) {
           throw new IllegalArgumentException("key 不能为空");
       }

// 从Map中获取锁对象
       Semaphore semaphore = semaphores.get(key);
       if (semaphore == null) {
           throw new IllegalArgumentException("key " + key + "尚未加锁");
       }
       semaphore.release();
       if (semaphore.availablePermits() >= allowed_threads) {
           semaphores.remove(key, semaphore);
       }
   }

2.2.2 测试代码

import com.google.common.collect.Lists;
import org.junit.Test;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SimultaneousEntriesLockByKeyTest {

private final int maxThreadEachKey = 2;
   private final LockByKey<String> lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey);

private final CountDownLatch countDownLatch = new CountDownLatch(7);
   private final ExecutorService executorService = Executors.newFixedThreadPool(10);

@Test
   public void test() throws InterruptedException {
       List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
       Map<String, Integer> executingKeyCount = Collections.synchronizedMap(new HashMap<>());

for (int i = 0; i < keys.size(); i++) {
           String key = keys.get(i);
           int finalI = i;
           executorService.submit(() -> {
               lockByKey.lock(key);
               executingKeyCount.compute(key, (k, v) -> {
                   if (v != null && v + 1 > maxThreadEachKey) {
                       throw new RuntimeException("超过限制了");
                   }
                   return v == null ? 1 : v + 1;
               });
               try {
                   System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key));
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               } finally {
                   System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1));
                   lockByKey.unlock(key);
                   executingKeyCount.compute(key, (k, v) -> v - 1);
                   countDownLatch.countDown();
               }
           });
       }
       countDownLatch.await();
   }
}

输出:

time:2023-03-15T20:49:57.044195 ,index:6对 [d] 加锁 ->pool-1-thread-7count:1
time:2023-03-15T20:49:57.058942 ,index:5对 [b] 加锁 ->pool-1-thread-6count:2
time:2023-03-15T20:49:57.069789 ,index:1对 [a] 加锁 ->pool-1-thread-2count:2
time:2023-03-15T20:49:57.042402 ,index:4对 [c] 加锁 ->pool-1-thread-5count:1
time:2023-03-15T20:49:57.046866 ,index:0对 [a] 加锁 ->pool-1-thread-1count:2
time:2023-03-15T20:49:57.042991 ,index:3对 [b] 加锁 ->pool-1-thread-4count:2
time:2023-03-15T20:49:58.089557 ,index:0释放 [a] ->pool-1-thread-1count:1
time:2023-03-15T20:49:58.082679 ,index:6释放 [d] ->pool-1-thread-7count:0
time:2023-03-15T20:49:58.084579 ,index:4释放 [c] ->pool-1-thread-5count:0
time:2023-03-15T20:49:58.083462 ,index:5释放 [b] ->pool-1-thread-6count:1
time:2023-03-15T20:49:58.089576 ,index:3释放 [b] ->pool-1-thread-4count:1
time:2023-03-15T20:49:58.085359 ,index:1释放 [a] ->pool-1-thread-2count:1
time:2023-03-15T20:49:58.096912 ,index:2对 [a] 加锁 ->pool-1-thread-3count:1
time:2023-03-15T20:49:59.099935 ,index:2释放 [a] ->pool-1-thread-3count:0

三、总结

本文结合自己的理解和一些参考代码,给出自己的示例,希望对大家有帮助。

来源:https://blog.csdn.net/w605283073/article/details/129568863

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com