Java多线程之同步工具类Exchanger
作者:冬日毛毛雨 发布时间:2022-07-05 03:50:54
目录
1 Exchanger 介绍
2 Exchanger 实例
exchange等待超时
3 实现原理
1 Exchanger 介绍
前面分别介绍了CyclicBarrier、CountDownLatch、Semaphore,现在介绍并发工具类中的最后一个Exchange
。Exchanger
是一个用于线程间协作的工具类,Exchanger
用于进行线程间的数据交换,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange
方法交换数据,如果第一个线程先执行exchange
方法,它会一直等待第二个线程也执行exchange
方法,当两个线程都到达同步点时,这两个线程就可以交换数据。
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
在以上的描述中,有几个要点:
此类提供对外的操作是同步的;
用于成对出现的线程之间交换数据;
可以视作双向的同步队列;
可应用于基因算法、流水线设计等场景。
接着看api文档,这个类提供对外的接口非常简洁,一个无参构造函数,两个重载的范型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
2 Exchanger 实例
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(new Runnable() {
String data = "data1";
@Override
public void run() {
doExchangeWork(data, exchanger);
}
});
executor.execute(new Runnable() {
String data = "data2";
@Override
public void run() {
doExchangeWork(data, exchanger);
}
});
executor.shutdown();
}
private static void doExchangeWork(String data, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "正在把数据 " + data + " 交换出去");
Thread.sleep((long) (Math.random() * 1000));
String exchangeData = (String) exchanger.exchange(data);
System.out.println(Thread.currentThread().getName() + "交换得到数据 " + exchangeData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-1正在把数据 data1 交换出去
pool-1-thread-2正在把数据 data2 交换出去
pool-1-thread-2交换得到数据 data1
pool-1-thread-1交换得到数据 data2
当线程A调用Exchange
对象的exchange()
方法后,他会陷入阻塞状态,直到线程B也调用了exchange()
方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。
exchange等待超时
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
executor.execute(new Runnable() {
String data = "data1";
@Override
public void run() {
doExchangeWork(data, exchanger);
}
});
executor.execute(new Runnable() {
String data = "data2";
@Override
public void run() {
try {
Thread.sleep((long) (3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
doExchangeWork(data, exchanger);
}
});
executor.shutdown();
}
private static void doExchangeWork(String data, Exchanger exchanger) {
try {
System.out.println(Thread.currentThread().getName() + "正在把数据 " + data + " 交换出去");
//远小于3秒抛出异常
String exchangeData = (String) exchanger.exchange(data,1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "交换得到数据 " + exchangeData);
} catch ( TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
pool-1-thread-1正在把数据 data1 交换出去
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$1.run(ExchangerTest.java:12)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
pool-1-thread-2正在把数据 data2 交换出去
java.util.concurrent.TimeoutException
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at ExchangerTest.doExchangeWork(ExchangerTest.java:37)
at ExchangerTest.access$000(ExchangerTest.java:3)
at ExchangerTest$2.run(ExchangerTest.java:26)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
实战场景:
设计一个定时任务,每日凌晨执行。在定时任务中启动两个线程,一个线程负责对业务明细表(xxx_info)进行查询统计,把统计的结果放置在内存缓冲区,另一个线程负责读取缓冲区中的统计结果并插入到业务统计表(xxx_statistics)中。
亲,这样的场景是不是听起来很有感觉?没错!两个线程在内存中批量交换数据,这个事情我们可以使用Exchanger去做!
3 实现原理
Exchanger
(交换者)是一个用于线程间协作的工具类。Exchanger
用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange
方法交换数据, 如果第一个线程先执行exchange
方法,它会一直等待第二个线程也执行exchange
,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger
的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的。
Exchanger类提供了两个方法,String exchange(V x):用
于交换,启动交换并等待另一个线程调用exchange
;String
exchange(V x,long timeout,TimeUnit unit):用于交换,启动交换并等待另一个线程调用exchange
,并且设置最大等待时间,当等待时间超过timeout
便停止等待。
来源:https://juejin.cn/post/7021034812295102501
猜你喜欢
- 问题介绍:用二维数组表示一个迷宫,设置迷宫起点和终点,输出迷宫中的一条通路实现思路:二维数组表示迷宫:0表示路且未走过、1表示墙、2表示通路
- 详解java中接口与抽象类的区别1.abstract class 在 Java 语言中表示的是一种继承关系,一个类只能使用一次继承关系。但是
- JVM内存组成结构JVM栈由堆、栈、本地方法栈、方法区等部分组成,结构图如下所示:1)堆所有通过new创建的对象的内存都在堆中分配,其大小可
- 一,背景之所以会想到一个服务同时使用eureka和nacos,是因为遇到一个需求,配置数据是存储在nacos的配置中,然后使用该配置的服务却
- 遇到的问题!注:自定义CommentGenerator的都知道通过实现CommentGenerator接口的一些不足,毕竟只是实现了Comm
- 我们来简单实现一个cookie。一、简单介绍Cookie 是一些数据, 存储于你电脑上的文本文件中。当 web 服务器向浏览器发送 web
- 条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让
- 一、logback日志技术介绍Spring Boot中使用的日志技术为logback。其与Log4J都出自同一人,性能要优于Log4J,是L
- java的String对象底层是有字符数组存储的,理论上char[] 最大长度是int的最大值,实际思路:首先,String字面
- Android的应用被限制为最多占用16m的内存,至少在T-Mobile G1上是这样的(当然现在已经有几百兆的内存可以用了——译者注)。它
- java 算法之希尔排序一、思想 希尔排序:使数组中任意间隔为h的元素都是有序的。在进行排序的时候,如果h很大,我们就能将元素移动到很远的地
- 前言最近数据库大作业要连接数据库,看了很多博客文章终于连接好了,但是没有看到一篇博客是能直接连接完成的,所以在这记录一下希望能帮助大家sql
- 基本要点1、Lombok作用:在我们的实体类中,我们再也不需要声明get、set、有参无参等方法,统统可以通过Lombok注解来实现同样的功
- 前言对于数组遍历,基本上每个开发者都写过,遍历本身没什么好说的,但是当我们在遍历的过程中,有一些复杂的业务逻辑时,将会发现代码的层级会逐渐加
- 功能描述1、创建扑克牌。包括四种花色(黑桃,红心,梅花,方块),十三种点数(2-10,J,Q,K),不考虑大小王。2、创建两个玩家。包括玩家
- java修改JFrame默认字体修改默认字体的方法很简单。首先我们随便写一个按钮出来:import javax.swing.*; publi
- 一、背景在开发过程中,我们的软件会面对不同的运行环境,比如开发环境、测试环境、生产环境,而我们的软件在不同的环境中,有的配置可能会不一样,比
- 启动第二个服务时就会报如下的错误:Server failed to start for port 8080: Address already
- 本文实例讲述了Android实现的数字格式化用法。分享给大家供大家参考,具体如下:package formatnumber;import j
- 使用范围synchronized使用上用于同步方法或者同步代码块在锁实现上是基于对象去实现使用中用于对static修饰的便是class类锁使