CountDownLatch和Atomic原子操作类源码解析
作者:Q.E.D 发布时间:2023-06-07 06:31:52
引导语
本小节和大家一起来看看 CountDownLatch 和 Atomic 打头的原子操作类,CountDownLatch 的源码非常少,看起来比较简单,但 CountDownLatch 的实际应用却不是很容易;Atomic 原子操作类就比较好理解和应用,接下来我们分别来看一下。
1、CountDownLatch
CountDownLatch 中文有的叫做计数器,也有翻译为计数锁,其最大的作用不是为了加锁,而是通过计数达到等待的功能,主要有两种形式的等待:
让一组线程在全部启动完成之后,再一起执行(先启动的线程需要阻塞等待后启动的线程,直到一组线程全部都启动完成后,再一起执行);
主线程等待另外一组线程都执行完成之后,再继续执行。
我们会举一个示例来演示这两种情况,但在这之前,我们先来看看 CountDownLatch 的底层源码实现,这样就会清晰一点,不然一开始就来看示例,估计很难理解。
CountDownLatch 有两个比较重要的 API,分别是 await 和 countDown,管理着线程能否获得锁和锁的释放(也可以称为对 state 的计数增加和减少)。
1.1、await
await 我们可以叫做等待,也可以叫做加锁,有两种不同入参的方法,源码如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的,最终都会转化成毫秒
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
两个方法底层使用的都是 sync,sync 是一个同步器,是 CountDownLatch 的内部类实现的,如下:
private static final class Sync extends AbstractQueuedSynchronizer {}
可以看出来 Sync 继承了 AbstractQueuedSynchronizer,具备了同步器的通用功能。
无参 await 底层使用的是 acquireSharedInterruptibly 方法,有参的使用的是 tryAcquireSharedNanos 方法,这两个方法都是 AQS 的方法,底层实现很相似,主要分成两步:
1.使用子类的 tryAcquireShared 方法尝试获得锁,如果获取了锁直接返回,获取不到锁走 2;
2.获取不到锁,用 Node 封装一下当前线程,追加到同步队列的尾部,等待在合适的时机去获得锁。
第二步是 AQS 已经实现了,第一步 tryAcquireShared 方法是交给 Sync 实现的,源码如下:
// 如果当前同步器的状态是 0 的话,表示可获得锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
获得锁的代码也很简单,直接根据同步器的 state 字段来进行判断,但还是有两点需要注意一下:
获得锁时,state 的值不会发生变化,像 ReentrantLock 在获得锁时,会把 state + 1,但 CountDownLatch 不会;
CountDownLatch 的 state 并不是 AQS 的默认值 0,而是可以赋值的,是在 CountDownLatch 初始化的时候赋值的,
代码如下:
// 初始化,count 代表 state 的初始化值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// new Sync 底层代码是 state = count;
this.sync = new Sync(count);
}
这里的初始化的 count 和一般的锁意义不太一样,count 表示我们希望等待的线程数,在两种不同的等待场景中,count 有不同的含义:
让一组线程在全部启动完成之后,再一起执行的等待场景下, count 代表一组线程的个数;
主线程等待另外一组线程都执行完成之后,再继续执行的等待场景下,count 代表一组线程的个数。
所以我们可以把 count 看做我们希望等待的一组线程的个数,可能我们是等待一组线程全部启动完成,可能我们是等待一组线程全部执行完成。
1.2、countDown
countDown 中文翻译为倒计时,每调用一次,都会使 state 减一,底层调用的方法如下:
public void countDown() {
sync.releaseShared(1);
}
releaseShared 是 AQS 定义的方法,方法主要分成两步:
1.尝试释放锁(tryReleaseShared),锁释放失败直接返回,释放成功走2
2.释放当前节点的后置等待节点。
第二步 AQS 已经实现了,第一步是 Sync 实现的,我们一起来看下 tryReleaseShared 方法的实现源码:
// 对 state 进行递减,直到 state 变成 0;
// state 递减为 0 时,返回 true,其余返回 false
protected boolean tryReleaseShared(int releases) {
// 自旋保证 CAS 一定可以成功
for (;;) {
int c = getState();
// state 已经是 0 了,直接返回 false
if (c == 0)
return false;
// 对 state 进行递减
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
从源码中可以看到,只有到 count 递减到 0 时,countDown 才会返回 true。
1.3、示例
看完 CountDownLatch 两个重要 API 后,我们来实现文章开头说的两个功能:
让一组线程在全部启动完成之后,再一起执行;
主线程等待另外一组线程都执行完成之后,再继续执行。
代码在 CountDownLatchDemo 类中,大家可以调试看看,源码如下:
public class CountDownLatchDemo {
// 线程任务
class Worker implements Runnable {
// 定义计数锁用来实现功能 1
private final CountDownLatch startSignal;
// 定义计数锁用来实现功能 2
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
// 子线程做的事情
public void run() {
try {
System.out.println(Thread.currentThread().getName()+" begin");
// await 时有两点需要注意:await 时 state 不会发生变化,2:startSignal 的state初始化是 1,所以所有子线程都是获取不到锁的,都需要到同步队列中去等待,达到先启动的子线程等待后面启动的子线程的结果
startSignal.await();
doWork();
// countDown 每次会使 state 减一,doneSignal 初始化为 9,countDown 前 8 次执行都会返回 false (releaseShared 方法),执行第 9 次时,state 递减为 0,会 countDown 成功,表示所有子线程都执行完了,会释放 await 在 doneSignal 上的主线程
doneSignal.countDown();
System.out.println(Thread.currentThread().getName()+" end");
} catch (InterruptedException ex) {
} // return;
}
void doWork() throws InterruptedException {
System.out.println(Thread.currentThread().getName()+"sleep 5s …………");
Thread.sleep(5000l);
}
}
@Test
public void test() throws InterruptedException {
// state 初始化为 1 很关键,子线程是不断的 await,await 时 state 是不会变化的,并且发现 state 都是 1,所有线程都获取不到锁
// 造成所有线程都到同步队列中去等待,当主线程执行 countDown 时,就会一起把等待的线程给释放掉
CountDownLatch startSignal = new CountDownLatch(1);
// state 初始化成 9,表示有 9 个子线程执行完成之后,会唤醒主线程
CountDownLatch doneSignal = new CountDownLatch(9);
for (int i = 0; i < 9; ++i) // create and start threads
{
new Thread(new Worker(startSignal, doneSignal)).start();
}
System.out.println("main thread begin");
// 这行代码唤醒 9 个子线程,开始执行(因为 startSignal 锁的状态是 1,所以调用一次 countDown 方法就可以释放9个等待的子线程)
startSignal.countDown();
// 这行代码使主线程陷入沉睡,等待 9 个子线程执行完成之后才会继续执行(就是等待子线程执行 doneSignal.countDown())
doneSignal.await();
System.out.println("main thread end");
}
}
执行结果:
Thread-0 begin
Thread-1 begin
Thread-2 begin
Thread-3 begin
Thread-4 begin
Thread-5 begin
Thread-6 begin
Thread-7 begin
Thread-8 begin
main thread begin
Thread-0sleep 5s …………
Thread-1sleep 5s …………
Thread-4sleep 5s …………
Thread-3sleep 5s …………
Thread-2sleep 5s …………
Thread-8sleep 5s …………
Thread-7sleep 5s …………
Thread-6sleep 5s …………
Thread-5sleep 5s …………
Thread-0 end
Thread-1 end
Thread-4 end
Thread-3 end
Thread-2 end
Thread-8 end
Thread-7 end
Thread-6 end
Thread-5 end
main thread end
从执行结果中,可以看出已经实现了以上两个功能,实现比较绕,大家可以根据注释,debug 看一看。
2、Atomic 原子操作类
Atomic 打头的原子操作类有很多,涉及到 Java 常用的数字类型的,基本都有相应的 Atomic 原子操作类,如下图所示:
Atomic 打头的原子操作类,在高并发场景下,都是线程安全的,我们可以放心使用。
我们以 AtomicInteger 为例子,来看下主要的底层实现:
private volatile int value;
// 初始化
public AtomicInteger(int initialValue) {
value = initialValue;
}
// 得到当前值
public final int get() {
return value;
}
// 自增 1,并返回自增之前的值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 自减 1,并返回自增之前的值
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
从源码中,我们可以看到,线程安全的操作方法,底层都是使用 unsafe 方法实现,以上几个 unsafe 方法不是使用 Java 实现的,都是线程安全的。
AtomicInteger 是对 int 类型的值进行自增自减,那如果 Atomic 的对象是个自定义类怎么办呢,Java 也提供了自定义对象的原子操作类,叫做 AtomicReference。AtomicReference 类可操作的对象是个泛型,所以支持自定义类,其底层是没有自增方法的,操作的方法可以作为函数入参传递,源码如下:
// 对 x 执行 accumulatorFunction 操作
// accumulatorFunction 是个函数,可以自定义想做的事情
// 返回老值
public final V getAndAccumulate(V x,
BinaryOperator<V> accumulatorFunction) {
// prev 是老值,next 是新值
V prev, next;
// 自旋 + CAS 保证一定可以替换老值
do {
prev = get();
// 执行自定义操作
next = accumulatorFunction.apply(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
3、总结
CountDownLatch 的源码实现简单,但真的要用好还是不简单的,其使用场景比较复杂,建议同学们可以 debug 一下
CountDownLatchDemo,在增加实战能力基础上,增加底层的理解能力。
来源:https://blog.csdn.net/qq_34272760/article/details/120642263
猜你喜欢
- WPF实现滚动条还是比较方便的,只要在控件外围加上ScrollViewer即可,但美中不足的是:滚动的时候没有动画效果。在滚动的时候添加过渡
- RibbonRibbon 是 Netflix开源的基于HTTP和TCP等协议负载均衡组件Ribbon 可以用来做客户端负载均衡,调用注册中心
- 一、需要自定义登录结果的场景在我之前的文章中,做过登录验证流程的源码解析。其中比较重要的就是当我们登录成功的时候,是由Authenticat
- IDEA 报错:无效的源发行版问题描述从SVN拉项目代码到本地后用idea运行,发现几个报错,关键的一个是:无效的源发行版,考虑是JDK版本
- 需求:之前项目一个变动,需要对3张mysql数据库表数据进行清洗,3张表表名不同,表结构完全相同,需要对这3张表进行相同的增、改、查动作,一
- 附加依赖项属性是一个属性本来不属于对象自己,但是某些特定场景其他的对象要使用该对象在这种场景下的值。这个值只在这个场景下使用。基于这个需求设
- 报文(message)是网络中交换与传输的数据单元,即站点一次性要发送的数据块。报文包含了将要发送的完整的数据信息,其长短很不一致,长度不限
- 一、why(为什么要用Hibernate缓存?)Hibernate是一个持久层框架,经常访问物理数据库。为了降低应用程序对物理数据源访问的频
- 目录首先,写一个需求文档:一、登录界面1.界面2.登录3.退出二、开始游戏界面三、缓冲加载游戏界面四、游戏主界面五、结束界面上代码首先,写一
- 如果有哪一个做程序员的小伙伴说自己没有遇到中文乱码问题,我是不愿意相信的。今天在做微信订阅号的智能回复时,又一时迷乱的跳进了中文乱码这个火坑
- 本文实例讲述了C#数字图像处理之图像缩放的方法。分享给大家供大家参考。具体如下://定义图像缩放函数private static Bitma
- LockSupport 简介LockSupport 是 Java 并发编程中一个非常重要的组件,我们熟知的并发组件 Lock、线程池、Cou
- 本文研究的主要是java网络爬虫连接超时的问题,具体如下。在网络爬虫中,经常会遇到如下报错。即连接超时。针对此问题,一般解决思路为:将连接时
- 本文实例讲述了C#实现将DataTable内容输出到Excel表格的方法。分享给大家供大家参考。具体如下:1.关于本文本文描述了一个函数(S
- 绘制模糊数学中隶属函数分布图using System; using System.Collect
- 前言在项目中,如果我们要遵循分层领域模型规约: 话,肯定避免不了在DTO、VO、BO、AO、VO、Query等实体的转换,我们通常有几种做法
- 自定义Starter命名规则注意artifactId的命名规则,Spring官方Starter通常命名为spring-boot-starte
- Spring cloud网关gateway进行websocket路由转发规则配置一、websocket及http路由转发规则配置后端是普通的
- 前言在我们开发过程中,由于主流的架构都是采用前后端分离的方式,我们作为后端开发者需要为前段持续地提供运行在容器中最新代码,虽然可
- 滑动删除的部分主要包含两个部分, 一个是内容区域(用于放置正常显示的view),另一个是操作区域(用于放置删除按钮)。默认情况下,操作区域是