Java多线程 CompletionService
作者:冬日毛毛雨 发布时间:2022-08-19 18:51:32
目录
1 CompletionService介绍
2 CompletionService源码分析
3 CompletionService实现任务
4 CompletionService总结
1 CompletionService介绍
CompletionService
用于提交一组Callable
任务,其take方法返回已完成的一个Callable
任务对应的Future
对象。
如果你向Executor
提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以将每个任务的Future
保存进一个集合,然后循环这个集合调用Future
的get()
取出数据。幸运的是CompletionService
帮你做了这件事情。CompletionService
整合了Executor
和BlockingQueue
的功能。你可以将Callable
任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future
。CompletionService
的take返回的future
是哪个先完成就先返回哪一个,而不是根据提交顺序。
2 CompletionService源码分析
首先看一下 构造方法:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
构造法方法主要初始化了一个阻塞队列,用来存储已完成的task
任务。
然后看一下 completionService.submit
方法:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
可以看到,callable
任务被包装成QueueingFuture
,而 QueueingFuture
是 FutureTask
的子类,所以最终执行了FutureTask
中的run()
方法。
来看一下该方法:
public void run() {
//判断执行状态,保证callable任务只被运行一次
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//这里回调我们创建的callable对象中的call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//处理执行结果
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到在该 FutureTask
中执行run
方法,最终回调自定义的callable
中的call
方法,执行结束之后,
通过 set(result)
处理执行结果:
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
继续跟进finishCompletion()
方法,在该方法中找到 done()
方法:
protected void done() { completionQueue.add(task); }
可以看到该方法只做了一件事情,就是将执行结束的task
添加到了队列中,只要队列中有元素,我们调用take()
方法时就可以获得执行的结果。
到这里就已经清晰了,异步非阻塞获取执行结果的实现原理其实就是通过队列来实现的,FutureTask
将执行结果放到队列中,先进先出,线程执行结束的顺序就是获取结果的顺序。
CompletionService
实际上可以看做是Executor
和BlockingQueue
的结合体。CompletionService
在接收到要执行的任务时,通过类似BlockingQueue
的put和take获得任务执行的结果。CompletionService
的一个实现是ExecutorCompletionService
,ExecutorCompletionService
把具体的计算任务交给Executor
完成。
在实现上,ExecutorCompletionService
在构造函数中会创建一个BlockingQueue
(使用的基于链表的 * 队列LinkedBlockingQueue),该BlockingQueue
的作用是保存Executor
执行的结果。当计算完成时,调用FutureTask
的done方法。当提交一个任务到ExecutorCompletionService
时,首先将任务包装成QueueingFuture
,它是FutureTask
的一个子类,然后改写FutureTask
的done方法,之后把Executor
执行的计算结果放入BlockingQueue
中。
QueueingFuture
的源码如下:
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
3 CompletionService实现任务
public class CompletionServiceTest {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
for (int i = 1; i <=10; i++) {
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
threadPool.shutdown();
for (int i = 0; i < 10; i++) {
try {
System.out.println(
completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
7
3
9
8
1
2
4
6
5
10
4 CompletionService总结
相比ExecutorService
,CompletionService
可以更精确和简便地完成异步任务的执行CompletionService
的一个实现是ExecutorCompletionService
,它是Executor
和BlockingQueue
功能的融合体,Executor
完成计算任务,BlockingQueue
负责保存异步任务的执行结果
在执行大量相互独立和同构的任务时,可以使用CompletionService
CompletionService
可以为任务的执行设置时限,主要是通过BlockingQueue
的poll
(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务
来源:https://juejin.cn/post/7018423693793558558


猜你喜欢
- Java中的Runnable,Callable,Future,FutureTask的比较Java中存在Runnable、Callable、F
- Eclipse 开发java 出现Failed to create the Java Virtual Machine错误解决办法一直用Ecl
- primary_text_yellow.xml <?xml version="1.0" encoding=&quo
- 一般在实际项目的开发中,会要求涉及日志记录的问题,比较常用的有Log4Net,NLog等几个,而小项目小工具的话,则无需费此大驾。而譬如串口
- java如何实现ftp上传?如何创建文件夹?最佳答案:准备条件:java实现ftp上传用到了commons-net-3.3.jar包首先建立
- using System; using System.Collections; using System.Text; using Syste
- 本文实例讲述了C# winform实现右下角弹出窗口结果的方法。分享给大家供大家参考,具体如下:using System.Runtime.I
- 本文实例为大家分享了C#实现语音播报功能的具体代码,供大家参考,具体内容如下环境:window10vs2019 16.5.5.netfram
- 下面通过代码看下JAVA查询树结构数据(省市区)使用hutool工具实现代码:@PostMapping("/getTree&quo
- 本文实例讲述了C#实现对数组进行随机排序类。分享给大家供大家参考。具体如下:这个一个扩充C#随机数发生器的类,可以随机生成指定范围的数字,可
- 一、SpringBoot 指定配置文件路径:在 SpringBoot 中,可以将配置文件放在 jar 包外面,这样可以方便地修改配置而不需要
- 什么是jdkjdk是什么呢?jdk的是java development kit的缩写,意思是java程序开发的工具包。也可以说jdk是jav
- Java的动态绑定所谓的动态绑定就是指程执行期间(而不是在编译期间)判断所引用对象的实际类型,根据其实际的类型调用其相应的方法。java继承
- 简介Android Studio升级到3.0后,有不少的改动和新特性,先贴出官方的迁移说明。本文会持续收集与总结本人在使用Android S
- 本文实例讲述了Android编程设计模式之访问者模式。分享给大家供大家参考,具体如下:一、介绍访问者模式是一种将数据操作与数据结构分离的设计
- 使用自定义注解实现接口限流在高并发系统中,保护系统的三种方式分别为:缓存,降级和限流。限流的目的是通过对并发访问请求进行限速或者一个时间窗口
- 一:item的根布局设置Android:clickable="true",之后导致item点击事件失效,对根
- 这篇文章主要介绍了SpringBoot整合Druid数据源过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价
- public static string encode(string str) { &
- 引入依赖<dependency> <groupId>org.apache.tika</groupI