软件编程
位置:首页>> 软件编程>> java编程>> Java多线程之同步工具类Exchanger

Java多线程之同步工具类Exchanger

作者:冬日毛毛雨  发布时间:2022-07-05 03:50:54 

标签:Java,多线程,Exchanger
目录
  • 1 Exchanger 介绍

  • 2 Exchanger 实例

    • exchange等待超时

  • 3 实现原理

    1 Exchanger 介绍

    前面分别介绍了CyclicBarrier、CountDownLatch、Semaphore,现在介绍并发工具类中的最后一个Exchange
    Exchanger 是一个用于线程间协作的工具类,Exchanger用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange 方法交换数据,如果第一个线程先执行exchange 方法,它会一直等待第二个线程也执行exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据。

    A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

    在以上的描述中,有几个要点:

    • 此类提供对外的操作是同步的;

    • 用于成对出现的线程之间交换数据;

    • 可以视作双向的同步队列;

    • 可应用于基因算法、流水线设计等场景。

    • 接着看api文档,这个类提供对外的接口非常简洁,一个无参构造函数,两个重载的范型exchange方法:


    public V exchange(V x) throws InterruptedException
    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

    2 Exchanger 实例


    public class ExchangerTest {
       public static void main(String[] args) {
           ExecutorService executor = Executors.newCachedThreadPool();
           final Exchanger exchanger = new Exchanger();
           executor.execute(new Runnable() {
               String data = "data1";

    @Override
               public void run() {
                   doExchangeWork(data, exchanger);
               }
           });

    executor.execute(new Runnable() {
               String data = "data2";

    @Override
               public void run() {
                   doExchangeWork(data, exchanger);
               }
           });
           executor.shutdown();
       }

    private static void doExchangeWork(String data, Exchanger exchanger) {
           try {
               System.out.println(Thread.currentThread().getName() + "正在把数据 " + data + " 交换出去");
               Thread.sleep((long) (Math.random() * 1000));

    String exchangeData = (String) exchanger.exchange(data);
               System.out.println(Thread.currentThread().getName() + "交换得到数据  " + exchangeData);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }

    pool-1-thread-1正在把数据 data1 交换出去
    pool-1-thread-2正在把数据 data2 交换出去
    pool-1-thread-2交换得到数据  data1
    pool-1-thread-1交换得到数据  data2

    当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

    exchange等待超时


    public class ExchangerTest {
       public static void main(String[] args) {
           ExecutorService executor = Executors.newCachedThreadPool();
           final Exchanger exchanger = new Exchanger();
           executor.execute(new Runnable() {
               String data = "data1";

    @Override
               public void run() {
                   doExchangeWork(data, exchanger);
               }
           });

    executor.execute(new Runnable() {
               String data = "data2";

    @Override
               public void run() {
                   try {
                       Thread.sleep((long) (3000));
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
                   doExchangeWork(data, exchanger);
               }
           });
           executor.shutdown();
       }

    private static void doExchangeWork(String data, Exchanger exchanger) {
           try {
               System.out.println(Thread.currentThread().getName() + "正在把数据 " + data + " 交换出去");

    //远小于3秒抛出异常
               String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS);
               System.out.println(Thread.currentThread().getName() + "交换得到数据  " + exchangeData);
           } catch ( TimeoutException e) {
               e.printStackTrace();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    }

    pool-1-thread-1正在把数据 data1 交换出去
    java.util.concurrent.TimeoutException
        at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
        at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
        at ExchangerTest.access$000(ExchangerTest.java:3)
        at ExchangerTest$1.run(ExchangerTest.java:12)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    pool-1-thread-2正在把数据 data2 交换出去
    java.util.concurrent.TimeoutException
        at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
        at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
        at ExchangerTest.access$000(ExchangerTest.java:3)
        at ExchangerTest$2.run(ExchangerTest.java:26)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    实战场景:

    设计一个定时任务,每日凌晨执行。在定时任务中启动两个线程,一个线程负责对业务明细表(xxx_info)进行查询统计,把统计的结果放置在内存缓冲区,另一个线程负责读取缓冲区中的统计结果并插入到业务统计表(xxx_statistics)中。
    亲,这样的场景是不是听起来很有感觉?没错!两个线程在内存中批量交换数据,这个事情我们可以使用Exchanger去做!

    3 实现原理

    Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的。
    Exchanger类提供了两个方法,String exchange(V x):用于交换,启动交换并等待另一个线程调用exchangeString exchange(V x,long timeout,TimeUnit unit):用于交换,启动交换并等待另一个线程调用exchange,并且设置最大等待时间,当等待时间超过timeout便停止等待。

    来源:https://juejin.cn/post/7021034812295102501

    0
    投稿

    猜你喜欢

    手机版 软件编程 asp之家 www.aspxhome.com