软件编程
位置:首页>> 软件编程>> java编程>> java线程并发控制同步工具CountDownLatch

java线程并发控制同步工具CountDownLatch

作者:叫我小郭  发布时间:2022-09-02 12:18:06 

标签:java,线程并发,CountDownLatch

前言

大家好,我是小郭,前面我们学习了利用Semaphore来防止多线程同时操作一个资源,通常我们都会利用并行来优化性能,但是对于串行化的业务,可能需要按顺序执行,那我们怎么才能处理呢?今天我们来学习另一个并发流程控制的同步工具CountDownLatch。

了解CountDownLatch

首先,CountDownLatch是一种并发流程控制的同步工具。

主要的作用是等待多个线程同时完成任务之后,再继续完成主线程任务。

简单点可以理解为,几个小伙伴一起到火锅店聚餐,人到齐了,火锅店才可以开饭。

思考问题:

  • CountDownLatch 底层原理是什么,他是否可以代替wait / notify?

  • CountDwonLatch 业务场景有哪些?

  • 一次可以唤醒多个任务吗?

主要参数与方法

//减少锁存器的计数,如果计数达到零,则释放所有等待线程。
//计数器
public void countDown() {
   sync.releaseShared(1);
}
//导致当前线程等待,直到锁存器递减至零为止,除非该线程被中断。
//火锅店调用await的线程,count为0才能继续执行
public void await() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);
}

构造方法

//count 数量,理解为小伙伴的个数
public CountDownLatch(int count) {
   if (count < 0) throw new IllegalArgumentException("count < 0");
       this.sync = new Sync(count);
}
//获取剩余的数量
public long getCount() {
   return sync.getCount();
}

CountDownLatch底层实现原理

我们可以看出countDown()是CountDownLatch的核心方法,我来看下他的具体实现。

java线程并发控制同步工具CountDownLatch

CountDownLatch来时继承AQS的共享模式来完成其的实现,从前面的学习得出AQS主要是依赖同步队列和state实现控制。

共享模式:

这里与独占锁大多数相同,自旋过程中的退出条件是是当前节点的前驱节点是头结点并且tryAcquireShared(arg)返回值大于等于0即能成功获得同步状态.

await

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
   return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//当状态不为0挂起,表示当前线程被占有,需要线程排队
protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}
//在共享模式下获取
doAcquireSharedInterruptibly(int arg)

countDown

public void countDown() {
   sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//自旋防止失败
   for (;;) {
       //获取状态
       int c = getState();
       //状态为为0返回false,表示没有被线程占有
       if (c == 0) return false;
       //调用cas来进行替换,也保证了线程安全,当为0的时候唤醒
       int nextc = c-1;
       if (compareAndSetState(c, nextc))
           return nextc == 0;
   }
}
//当任务数量为0,aqs的释放共享锁
void doReleaseShared()
private void doReleaseShared() {
   /*
       * Ensure that a release propagates, even if there are other
       * in-progress acquires/releases.  This proceeds in the usual
       * way of trying to unparkSuccessor of head if it needs
       * signal. But if it does not, status is set to PROPAGATE to
       * ensure that upon release, propagation continues.
       * Additionally, we must loop in case a new node is added
       * while we are doing this. Also, unlike other uses of
       * unparkSuccessor, we need to know if CAS to reset status
       * fails, if so rechecking.
       */
   // 无限循环
   for (;;) {
       // 保存头节点
       Node h = head;
       // 头节点不为空并且头节点不为尾结点
       if (h != null && h != tail) {
           // 获取头节点的等待状态
           int ws = h.waitStatus;
           if (ws == Node.SIGNAL) {
               // 状态为SIGNAL,CAS更新状态
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
               // 释放后继结点
               unparkSuccessor(h);
           }
           // 状态为0并且更新不成功,继续
           else if (ws == 0 &&
                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //
               continue;                // loop on failed CAS
       }
       if (h == head) // 若头节点改变,继续循环  
           break;
   }
}

思考

  • 如何安排线程排序

个人认为,没有进行线程的排序,而是让一部分线程进入等待,在唤醒的时候放开。

执行流程图

java线程并发控制同步工具CountDownLatch

实践

用法一:

一个线程等待其他多个线程都执行完毕,再继续自己的工作

public class CountDownLatchTest {
   private static Lock lock = new ReentrantLock();
   private static CountDownLatch countDownLatch = new CountDownLatch(4);
   public static void main(String[] args) {
       ExecutorService executorService = Executors.newFixedThreadPool(4);
       IntStream.range(0,16).forEach(i -&gt;{
           executorService.submit(()-&gt;{
               lock.lock();
               System.out.println(Thread.currentThread().getName()+ "来火锅店吃火锅!");
               try {
                   Thread.sleep(1000);
                   countDownLatch.countDown();
                   System.out.println(Thread.currentThread().getName() + "我到火锅店了,准备开吃!");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               } finally {
                   lock.unlock();
               }
           });
       });
       try {
           countDownLatch.await(5,TimeUnit.SECONDS);
           System.out.println("人到齐了,开饭");
           executorService.shutdown();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

输出结果

java线程并发控制同步工具CountDownLatch

代码中设置了一个CountDownLatch做倒计时,四个人(count为4)一起到火锅店吃饭,每到一个人计数器就减去1(countDownLatch.countDown()),当计数器为0的时候,main线程在await的阻塞结束,继续往下执行。

用法二:

多个线程等待某一个线程的信号,同时开始执行

用抢位子作为例子,将线程挂起等待,同时开始执行。

public class CountDownLatchTest2 {
   private static Lock lock = new ReentrantLock();
   private static CountDownLatch countDownLatch = new CountDownLatch(1);
   public static void main(String[] args) {
       ExecutorService executorService = Executors.newFixedThreadPool(4);
       IntStream.range(0,4).forEach(i ->{
           executorService.submit(()->{
               System.out.println(Thread.currentThread().getName()+ "准备开始抢位子!");
               try {
                   //Thread.sleep(1000);
                   countDownLatch.await();
                   System.out.println(Thread.currentThread().getName() + "抢到了位置");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });
       });
       try {
           Thread.sleep(5000);
           System.out.println("五秒后开始抢位置");
           countDownLatch.countDown();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       executorService.shutdown();
   }
}

注意点

CountDownLatch是不能重用的。

来源:https://juejin.cn/post/7107873972015857678

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com