详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别
作者:wavebeed 发布时间:2022-03-22 13:03:02
标签:java,CountDownLatch,CyclicBarrier
前言
CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。
内部实现差异
前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。
public class {
//Synchronization control For CountDownLatch.
//Uses AQS state to represent count.
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
... ...//
}
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
... ... //
}
实战 - 展示各自的使用场景
/**
*类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
*/
public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
/*初始化线程*/
private static class InitThread implements Runnable{
public void run() {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work......");
latch.countDown();
for(int i =0;i<2;i++) {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ........continue do its work");
}
}
}
/*业务线程等待latch的计数器为0完成*/
private static class BusiThread implements Runnable{
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i =0;i<3;i++) {
System.out.println("BusiThread_"+Thread.currentThread().getId()
+" do business-----");
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for(int i=0;i<=3;i++){
Thread thread = new Thread(new InitThread());
thread.start();
}
latch.await();
System.out.println("Main do ites work........");
}
}
/**
*类说明:共4个子线程,他们全部完成工作后,交出自己结果,
*再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
*/
class UseCyclicBarrier {
private static CyclicBarrier barrier
= new CyclicBarrier(4,new CollectThread());
//存放子线程工作结果的容器
private static ConcurrentHashMap<String,Long> resultMap
= new ConcurrentHashMap<String,Long>();
public static void main(String[] args) {
for(int i=0;i<4;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
}
/*汇总的任务*/
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result);
System.out.println("do other business........");
}
}
/*相互等待的子线程*/
private static class SubThread implements Runnable{
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId()+"",id);
try {
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do something ");
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do its business ");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
两者总结
1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;
2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;
3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;
4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;
5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;
6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;
来源:https://blog.51cto.com/14815984/2496009


猜你喜欢
- 使用maven的profile功能,我们可以实现多环境配置文件的动态切换,可参考我的上一篇博客。但随着SpringBoot项目越来越火,越来
- 给组件加上 * 定义一个类,这个类继承ActionListenerpubulic class ButListener implements
- Handler、Message、Loopler、MessageQueen首先看一下我们平常使用Handler的一个最常见用法。Handler
- 本文实例为大家分享了convinientbanner顶部广告轮播控件的具体代码,供大家参考,具体内容如下gradle中添加compile &
- 本文实例讲述了Java实现SSL双向认证的方法。分享给大家供大家参考,具体如下:我们常见的SSL验证较多的只是验证我们的服务器是否是真实正确
- 要求:1、输入手机号,点击发送后随机生成6位数字码,2分钟有效2、输入验证码,点击验证,返回成功或失败3、每个手机号每天只能输入3次代码如下
- 参考ColorComboBox做修改,并对颜色名做些修正,用于CR MVMixer产品中,聊作备忘~效果图:代码://颜色拾取框using
- 公司的一个手机端的 CRM 项目最近要增加小票打印的功能,就是我们点外卖的时候经常会见到的那种小票。这里主要涉及到两大块的知识:蓝牙连接及数
- 关于Function.identity()的使用简单介绍话不多说,直接上JDK源码:static Function identity() {
- 本章讲述:FileStream类的基本功能,以及简单示例;1、引用命名空间:using System.IO;2、注意:使用IO操作文件时,要
- 一、读线圈状态/// <summary> /// 读线圈状态测试 &nbs
- 本文实例讲述了java实现清理DNS Cache的方法。分享给大家供大家参考。具体分析如下:一、测试环境OS:Windows7 x64JDK
- 本文实例讲述了C# Socket网络编程技巧。分享给大家供大家参考。具体分析如下:客户端要连接服务器:首先要知道服务器的IP地址。而服务器里
- spring xml中配置视图如果是如下<property name="defaultViews"><
- /* * 名称:RandomId * 功能:生成随机ID * 作者:冰麟轻武 * 日期:2012年1
- 先来回忆下在mybatis中的resultMap作用和是什么resultMap的作用是什么在使用传统的mybatis时,我们一般都会在xml
- 简介我们在前面的Android教程中已经提到过这么一件事:Android在启动后会有一个主线程。它不允许任何子线程去改变主UI线程里的内容。
- 前言延迟初始化 是一种将对象的创建延迟到第一次需要用时的技术,换句话说,对象的初始化是发生在真正需要的时候才执行,值得注意的是,术语&nbs
- 数据表格能够清晰的呈现数据信息,但是我们对于一些繁杂多变的数据想要很直观的看到数据变化走势或者数据的占比时,数据图表会更具代表性,并且在呈现
- 1.通过看logcat下的日志2.通过adb命令3.通过写代码获取3.1写一个工具类打印系统时间3.2 在Application启动的时候打