软件编程
位置:首页>> 软件编程>> java编程>> Java多线程之条件对象Condition

Java多线程之条件对象Condition

作者:冬日毛毛雨  发布时间:2021-09-25 15:15:30 

标签:Java,多线程,Condition
目录
  • 1 简介

  • 2 Condition的实现分析

    • 等待队列

    • 等待(await):AbstractQueuedLongSynchronizer中实现

    • Condition等待通知的本质

    • 通知(signal):AbstractQueuedLongSynchronizer中实现

  • 3 Condition 实例

    • 三个线程依次打印ABC

    • 虚假唤醒

  • 4、总结

    1 简介

    Condition中的await()方法相当于Objectwait()方法,Condition中的signal()方法相当于Objectnotify()方法,Condition中的signalAll()相当于ObjectnotifyAll()方法。

    不同的是,Object中的wait() ,notify() ,notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

    2 Condition的实现分析

    Condition是同步器AbstractQueuedSynchronized的内部类,因为Condition的操作需要获取相关的锁,所以作为同步器的内部类比较合理。每个Condition对象都包含着一个队列(等待队列),该队列是Condition对象实现等待/通知功能的关键。

    等待队列

    等待队列是一个FIFO的队列,队列的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了await()方法,该线程就会释放锁、构造成节点进入等待队列并进入等待状态。

    Java多线程之条件对象Condition

    这里的节点定义也就是AbstractQueuedSynchronizer.Node的定义。

    一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法时,将会以当前线程构造节点,并将节点从尾部加入等待队列。

    Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock(同步器)拥有一个同步队列和多个等待队列。

    Java多线程之条件对象Condition

    等待(await):AbstractQueuedLongSynchronizer中实现

    调用Conditionawait()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。

    从队列的角度来看,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

    当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过Condition.signal()方法唤醒,而是对等待线程进行中断,则抛出InterruptedException


           public final void await() throws InterruptedException {
               if (Thread.interrupted())
                   throw new InterruptedException();
               Node node = addConditionWaiter();
               long savedState = fullyRelease(node);
               int interruptMode = 0;
               while (!isOnSyncQueue(node)) {
                   LockSupport.park(this);
                   if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                       break;
               }
               if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                   interruptMode = REINTERRUPT;
               if (node.nextWaiter != null) // clean up if cancelled
                   unlinkCancelledWaiters();
               if (interruptMode != 0)
                   reportInterruptAfterWait(interruptMode);
           }

    Condition等待通知的本质

    总的来说,Condition的本质就是等待队列和同步队列的交互:

    当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

    构造一个新的等待队列节点加入到等待队列队尾
    释放锁,也就是将它的同步队列节点从同步队列队首移除
    自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal() )或被中断
    阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
    当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:

    从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

    对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。

    然后分两种情况讨论:

    如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
    如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4.

    通知(signal):AbstractQueuedLongSynchronizer中实现

    调用Conditionsignal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

    ConditionsignalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,将等待队列中的节点全部移动到同步队列中,并唤醒每个节点的线程。


    public final void signal() {
       if (!isHeldExclusively())
           throw new IllegalMonitorStateException();
       Node first = firstWaiter;
       if (first != null)
           doSignal(first);
    }

    public final void signalAll() {
       if (!isHeldExclusively())
           throw new IllegalMonitorStateException();
       Node first = firstWaiter;
       if (first != null)
           doSignalAll(first);
    }

    最后还要注意,Java 中有 signal signalAll 两种方法,signal 是随机解除一个等待集中的线程的阻塞状态,signalAll 是解除所有等待集中的线程的阻塞状态。signal 方法的效率会比 signalAll 高,但是它存在危险,因为它一次只解除一个线程的阻塞状态,因此,如果等待集中有多个线程都满足了条件,也只能唤醒一个,其他的线程可能会导致死锁

    3 Condition 实例

    消费生产者模式:


    public class ConditionTest {
       public static void main(String[] args) {
           // 仓库
           Depot depot = new Depot(100);
           // 消费者
           Consumer consumer = new Consumer(depot);
           // 生产者
           Produce produce = new Produce(depot);
           produce.produceThing(5);
           consumer.consumerThing(5);
           produce.produceThing(2);
           consumer.consumerThing(5);
           produce.produceThing(3);
       }
    }

    class Depot {
       private int capacity;
       private int size;
       private Lock lock;
       private Condition consumerCond;
       private Condition produceCond;

    public Depot(int capacity) {
           this.capacity = capacity;
           this.size = 0;
           this.lock = new ReentrantLock();
           this.consumerCond = lock.newCondition();
           this.produceCond = lock.newCondition();
       }

    public void produce(int val) {
           lock.lock();
           try {
               int left = val;
               while (left > 0) {
                   while (size >= capacity) {
                       produceCond.await();
                   }
                   int produce = (left+size) > capacity ? (capacity-size) : left;
                   size += produce;
                   left -= produce;
                   System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size);
                   consumerCond.signalAll();
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
           } finally {
               lock.unlock();
           }
       }

    public void consumer(int val) {
           lock.lock();
           try {
               int left = val;
               while (left > 0) {
                   while (size <= 0) {
                       consumerCond.await();
                   }
                   int consumer = (size <= left) ? size : left;
                   size -= consumer;
                   left -= consumer;
                   System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size);
                   produceCond.signalAll();
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
           } finally {
               lock.unlock();
           }
       }
    }
    class Consumer {
       private Depot depot;
       public Consumer(Depot depot) {
           this.depot = depot;
       }

    public void consumerThing(final int amount) {
           new Thread(new Runnable() {
               public void run() {
                   depot.consumer(amount);
               }
           }).start();
       }
    }
    class Produce {
       private Depot depot;
       public Produce(Depot depot) {
           this.depot = depot;
       }

    public void produceThing(final int amount) {
           new Thread(new Runnable() {
               public void run() {
                   depot.produce(amount);
               }
           }).start();
       }
    }

    Thread-0, ProduceVal=5, produce=5, size=5
    Thread-1, ConsumerVal=5, consumer=5, size=0
    Thread-2, ProduceVal=2, produce=2, size=2
    Thread-3, ConsumerVal=5, consumer=2, size=0
    Thread-4, ProduceVal=3, produce=3, size=3
    Thread-3, ConsumerVal=5, consumer=3, size=0

    输出结果中,Thread-3出现两次,就是因为要消费5个产品,但仓库中只有2个产品,所以先将库存的2个产品全部消费,然后这个线程进入等待队列,等待生产,随后生产出了3个产品,生产者生产后又执行signalAll方法将等待队列中所有的线程都唤醒,Thread-3继续消费还需要的3个产品。

    三个线程依次打印ABC


    class Business {
       private Lock lock = new ReentrantLock();
       private Condition conditionA = lock.newCondition();
       private Condition conditionB = lock.newCondition();
       private Condition conditionC = lock.newCondition();
       private String type = "A"; //内部状态

    /*
        * 方法的基本要求为:
        * 1、该方法必须为原子的。
        * 2、当前状态必须满足条件。若不满足,则等待;满足,则执行业务代码。
        * 3、业务执行完毕后,修改状态,并唤醒指定条件下的线程。
        */
       public void printA() {
           lock.lock(); //锁,保证了线程安全。
           try {
               while (type != "A") { //type不为A,
                   try {
                       conditionA.await(); //将当前线程阻塞于conditionA对象上,将被阻塞。
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }

    //type为A,则执行。
               System.out.println(Thread.currentThread().getName() + " 正在打印A");
               type = "B"; //将type设置为B。
               conditionB.signal(); //唤醒在等待conditionB对象上的一个线程。将信号传递出去。
           } finally {
               lock.unlock(); //解锁
           }
       }

    public void printB() {
           lock.lock(); //锁
           try {
               while (type != "B") { //type不为B,
                   try {
                       conditionB.await(); //将当前线程阻塞于conditionB对象上,将被阻塞。
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }

    //type为B,则执行。
               System.out.println(Thread.currentThread().getName() + " 正在打印B");
               type = "C"; //将type设置为C。
               conditionC.signal(); //唤醒在等待conditionC对象上的一个线程。将信号传递出去。
           } finally {
               lock.unlock(); //解锁
           }
       }

    public void printC() {
           lock.lock(); //锁
           try {
               while (type != "C") {
                   try {
                       conditionC.await();
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               }

    System.out.println(Thread.currentThread().getName() + " 正在打印C");
               type = "A";
               conditionA.signal();
           } finally {
               lock.unlock(); //解锁
           }
       }
    }

    public class ConditionTest{

    public static void main(String[] args) {
           final Business business = new Business();//业务对象。

    //线程1号,打印10次A。
           Thread ta = new Thread(new Runnable() {

    @Override
               public void run() {
                   for(int i=0;i<10;i++){
                       business.printA();
                   }
               }
           });

    //线程2号,打印10次B。
           Thread tb = new Thread(new Runnable() {

    @Override
               public void run() {
                   for(int i=0;i<10;i++){
                       business.printB();
                   }
               }
           });

    //线程3号,打印10次C。
           Thread tc = new Thread(new Runnable() {

    @Override
               public void run() {
                   for(int i=0;i<10;i++){
                       business.printC();
                   }
               }
           });

    //执行3条线程。
           ta.start();
           tb.start();
           tc.start();
       }

    }

    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C
    Thread-0 正在打印A
    Thread-1 正在打印B
    Thread-2 正在打印C

    虚假唤醒

    所谓"虚假唤醒",即其他地方的代码触发了condition.signal() ,唤醒condition上等待的线程。但被唤醒的线程仍然不满足执行条件。

    condition通常与条件语句一起使用:


    if(!条件){
       condition.await(); //不满足条件,当前线程等待;
    }

    更好的方法是使用while:


    while(!条件){
       condition.await(); //不满足条件,当前线程等待;
    }

    在等待Condition时,允许发生"虚假唤醒",这通常作为对基础平台语义的让步。若使用"if(!条件)"则被"虚假唤醒"的线程可能继续执行。所以"while(!条件)"可以防止"虚假唤醒"。建议总是假定这些"虚假唤醒"可能发生,因此总是在一个循环中等待。

    4、总结

    如果知道Object的等待通知机制,Condition的使用是比较容易掌握的,因为和Object等待通知的使用基本一致。

    Condition的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。

    以下是笔者对等待队列是单向队列、同步队列是双向队列的一些思考,欢迎提出不同意见:

    之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由next指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用CAS把tail设置为新节点,然后才修改原tail的next指针到新节点的。因此用next向后遍历是不安全的,但是如果在设置新节点为tail前,为新节点设置prev,则可以保证从tail往前遍历是安全的。因此要安全的获取一个节点Node的下一个节点,先要看next是不是null,如果是null,还要从tail往前遍历看看能不能遍历到Node。

    而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要prev指针。

    来源:https://juejin.cn/post/7019161855608225805

    0
    投稿

    猜你喜欢

    手机版 软件编程 asp之家 www.aspxhome.com