Kotlin协程Job生命周期结构化并发详解
作者:无糖可乐爱好者 发布时间:2023-05-23 10:08:17
引言
前面在学习协程启动方式的时候在launch
的源码中有一个返回值是Job
,async
的返回Deferred
也是实现了Job
,那么而也就是说launch
和async
在创建一个协程的时候也会创建一个对应的Job
对象。还提到过Job
是协程的句柄,那么Job
到底是什么?它有什么用?
1.Job的生命周期
先看一下Job
的源码,这里只保留了跟标题相关的内容
public interface Job : CoroutineContext.Element {
// ------------ 状态查询API ------------
/**
* 当该Job处于活动状态时,返回true——它已经开始,没有完成,也没有取消。
* 如果没有取消或失败,等待其子任务完成的Job仍被认为是活动的。
*/
public val isActive: Boolean
/**
* 当Job因任何原因完成时返回true。作业被取消或失败并已完成其执行也被视为完成。
* Job只有在所有子任务完成后才算完成。
*/
public val isCompleted: Boolean
/**
*如果该作业因任何原因被取消,无论是通过显式调用cancel,还是因为它失败或其子或父作业被取消,
* 则返回true。在一般情况下,它并不意味着任务已经完成,因为它可能仍然在完成它正在做的事情,
* 并等待它的子任务完成。
*/
public val isCancelled: Boolean
// ------------ 操控状态API ------------
/**
* 如果Job所在的协程还没有被启动那么调用这个方法就会启动协程
* 如果这个协程被启动了返回true,如果已经启动或者执行完毕了返回false
*/
public fun start(): Boolean
/**
* 取消此Job,可用于指定错误消息或提供有关取消原因的其他详细信息
*/
public fun cancel(cause: CancellationException? = null)
/**
* 取消此Job
*/
public fun cancel(): Unit = cancel(null)
public fun cancel(cause: Throwable? = null): Boolean
// ------------ 等待状态API ------------
/**
* 挂起协程,知道任务完成再恢复
*/
public suspend fun join()
// ------------ 完成状态回调API ------------
/**
* 注册Job完成时同步调用的处理程序.
* 当Job已经完成时,将处理程序将立即调用Job的异常或取消原因或null
* 否则,该处理程序将在此Job完成时调用一次。
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
/**
* 注册在取消或完成此Job时同步调用的处理程序。
* 当Job已经被取消并完成执行时,处理程序将立即调用Job的取消原因或null,
* 除非将invokeImmediately设置为false。否则,
* 当Job取消或完成时将调用一次handler。
*/
public fun invokeOnCompletion(
onCancelling: Boolean = false,
invokeImmediately: Boolean = true,
handler: CompletionHandler): DisposableHandle
}
从源码中可以发现这几个函数和变量跟Actviity或者Fragment非常像,所以我们可以总结出两个结论:
Job可以监测协程的生命周期
Job可以操控协程
在例子中使用这几个函数和变量再来校验一下上面的结论:
fun main() = runBlocking {
val job = launch {
delay(1000L)
}
job.log()
job.cancel()
job.log()
}
fun Job.log() {
println(
"""
isActive:$isActive
isCompleted:$isCompleted
isCancelled:$isCancelled
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
//输出结果
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:false
//isCompleted:false
//isCancelled:true
//Thread:main @coroutine#1
//================================
Job.log
用了扩展函数,方便调用Job
中的状态监测返回值。
上面的代码通过launch
创建了一个协程,接收了Job
的返回值,这里用这个job
对象做了三件事:
第一个
job.log()
:launch
的创建标志着协程已经被启动所以在第一个job.log()
的日志中isActive
返回值是true;job.cancel()
: 这里调用了job
的取消函数将协程任务取消;第二个
job.log()
: 上面的代码将协程任务取消了,然后再次获取协程状态发现isActivte
返回false,isCancelled
返回true。
上面的代码也印证了前面提出的结论,还有一个函数start
没使用,再来调用它之后输出的日志:
fun main() = runBlocking {
//变化1
val job = launch(start = CoroutineStart.LAZY) {
delay(1000L)
}
job.log()
//变化2
job.start()
job.log()
job.cancel()
job.log()
}
fun Job.log() {
println(
"""
isActive:$isActive
isCompleted:$isCompleted
isCancelled:$isCancelled
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:false
//isCompleted:false
//isCancelled:true
//Thread:main @coroutine#1
//================================
上面的代码增加了两处修改:
变化1:协程在创建出来的时候就已经被启动,因此为了查看调用
Job.start()
前的日志需要加上懒启动变化2:调用
start
函数启动协程
从输出结果来看没有调用start
函数前isActive
返回true,调用后就返回了true
,当使用懒启动后在调用cancel
函数与前面使用cancel
函数输出的日志是一样的,可以得知懒启动后对协程的生命周期并没有设么影响(这可能是句废话)。
现在还有最后一个变量没有看isCompleted
,在上面的代码中添加一个延时函数,等协程任务结束再打印日志
fun main() = runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
delay(1000L)
}
job.log()
job.start()
job.log()
job.cancel()
delay(2000L)//变化在这里
job.log()
}
fun Job.log() {
println(
"""
isActive:$isActive
isCompleted:$isCompleted
isCancelled:$isCancelled
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:false
//isCompleted:true
//isCancelled:true
//Thread:main @coroutine#1
//================================
从输出结果中看到当调用isCancel
后isCompleted
也返回了true,也就是说任务结束了。
上面的代码为了监测isCompleted
的状态加了一个延时函数delay
,但是这种方式并不建议使用,因为这个时间他不是固定的,例如从后台请求数据或者下载文件,这种情况下的时间是完全无法预知的。
现在假设已经知道协程执行完毕需要delay(1000L)
的时间,如果将协程内的delay
时长设置的大于外部的delay
时长,会带来什么问题?
fun main() = runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
delay(4000L)
}
job.log()
job.start()
job.log()
delay(1000L)
job.log()
println("Process end!")
}
fun Job.log() {
println(
"""
isActive:$isActive
isCompleted:$isCompleted
isCancelled:$isCancelled
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//Process end!
由输出结果可知isCompleted
状态是false,协程任务是否执行完毕不得而知。另外当println("Process end!")
执行完毕后程序并没有立即输出Process finished with exit code 0
,这是因为runBlocking 会一直阻塞,等到 job 任务执行完毕以后才真正退出。
那要如何解决这个问题?
//Job#join
/**
* 挂起协程,知道任务完成再恢复
*/
public suspend fun join()
join
是Job
中的一个挂起函数,调用后会挂起当前程序的执行流程,等待job
当中的协程任务执行完毕然后再恢复当前程序的执行流程。
join
将任务挂起后再恢复,那要如何知道任务是否执行完毕了?invokeOnCompletion
可以监听任务执行的状态
//Job#invokeOnCompletion
/**
* 注册Job完成时同步调用的处理程序.
* 当Job已经完成时,将处理程序将立即调用Job的异常或取消原因或null
* 否则,该处理程序将在此Job完成时调用一次。
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
//Job#invokeOnCompletion
/**
* 注册在取消或完成此Job时同步调用的处理程序。
* 当Job已经被取消并完成执行时,处理程序将立即调用Job的取消原因或null,
* 除非将invokeImmediately设置为false。否则,
* 当Job取消或完成时将调用一次handler。
*/
public fun invokeOnCompletion(
onCancelling: Boolean = false,
invokeImmediately: Boolean = true,
handler: CompletionHandler): DisposableHandle
join
和invokeOnCompletion
的使用如下:
fun main() = runBlocking {
val job = launch(start = CoroutineStart.LAZY) {
delay(4000L)
}
job.log()
job.start()
job.log()
//新增
job.join()
//新增
job.invokeOnCompletion {
println("==========Task status==========")
job.log()
}
println("Process end!")
}
fun Job.log() {
println(
"""
isActive:$isActive
isCompleted:$isCompleted
isCancelled:$isCancelled
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
//输出结果:
//isActive:false
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//isActive:true
//isCompleted:false
//isCancelled:false
//Thread:main @coroutine#1
//================================
//==========Task status==========
//isActive:false
//isCompleted:true
//isCancelled:false
//Thread:main @coroutine#1
//================================
//Process end!
可以看到加入join
和invokeOnCompletion
之后isCompleted
的状态就正确了,同时Process end!
输出后Process finished with exit code 0
也会很快的输出,这说明任务确实执行完毕了。
在讲协程的启动方式的时候提出一个观点:launch
的返回值Job
代表的是协程的句柄。那么Job
是协程的句柄该怎么理解?
句柄: 是指一个中间媒介,可以操控一个东西。就类似于遥控器操作空调场景中遥控器就是句柄,开关控制灯具场景中开关就是句柄。
所以Job
和协程的关系就类似于遥控器和空调,开关和灯具。Job
可以监测协程的运行状态也可以控制协程的运行状态。那么Job
就和遥控器、开关一样看做是一个句柄。
2.Deffered
launch
直接创建了Job
,async
通过Deffered
间接创建了Job
对象,但是它并没有在 Job
的基础上扩展出很多其他功能,而接收一个返回值是依靠 await()
方法,那await
方法是如何实现的?
fun main() = runBlocking {
val deferred = async {
logX("Coroutine start!")
delay(1000L)
logX("Coroutine end!")
"Coroutine result!"
}
val result = deferred.await()
println("Result = $result")
logX("Process end!")
}
fun logX(any: Any?) {
println(
"""
================================
$any
Thread:${Thread.currentThread().name}
================================
""".trimIndent()
)
}
//输出结果:
//Coroutine start!
//Thread:main @coroutine#2
//================================
//================================
//Coroutine end!
//Thread:main @coroutine#2
//================================
//Result = Coroutine result!
//================================
//Process end!
//Thread:main @coroutine#1
从输出结果来看,await
方法可以获取协程执行结果外,好像还会阻塞协程的执行流程,直到协程任务执行完毕。看一下await
的源码
//Deferred#await
public interface Deferred<out T> : Job {
...
public suspend fun await(): T
...
}
从源码来看await
也是一个挂起函数,它跟join
是一样的,看似阻塞的过程其实是协程的挂起和恢复能力。
所以,总的来说,Deferred
只是比 Job
多了一个 await()
挂起函数而已,通过这个挂起函数,就可以等待协程执行完毕的同时,还可以直接拿到协程的执行结果。
3.Job与结构化并发
在其他地方看过这么一句话:协程的优势在于结构化并发, 这句话该如何理解?
这句话可以理解为带有结构和层级的并发,用代码表现就像这样:
fun main() = runBlocking {
val parentJob: Job
var childJob1: Job? = null
var childJob2: Job? = null
var childJob3: Job? = null
parentJob = launch {
childJob1 = launch {
delay(1000L)
}
childJob2 = launch {
delay(3000L)
}
childJob3 = launch {
delay(5000L)
}
}
delay(500L)
parentJob.children.forEachIndexed { index, job ->
when (index) {
0 -> println("childJob1 === childJob1 is ${childJob1 === job}")
1 -> println("childJob2 === childJob2 is ${childJob2 === job}")
2 -> println("childJob3 === childJob3 is ${childJob3 === job}")
}
}
parentJob.join()
logX("Process end!")
}
//输出结果:
//childJob1 === childJob1 is true
//childJob2 === childJob2 is true
//childJob3 === childJob3 is true
//================================
//Process end!
//Thread:main @coroutine#1
上面的代码是父子层级,父Job
使用launch
启动了协程同时它的内部还有三个Job
,三个子Job
是并发执行的,同时也是用过launch
启动的协程,调用了parentJob.join()
那么挂起的时间就是childJob3
的时长—5秒,因为它要等待所有任务都执行完毕才会恢复执行,然后通过children.forEachIndexed
进行遍历并分别对比他们与三个子Job
的引用是否相等“===”代表了引用相等,即是否是同一个对象)。图示如下
前面讲过,Job
可以调用cancel
方法取消执行,那么当调用parentJob.cancel
会有什么样的情况?
fun main() = runBlocking {
val parentJob: Job
var childJob1: Job? = null
var childJob2: Job? = null
var childJob3: Job? = null
parentJob = launch {
childJob1 = launch {
println("childJob1 start")
delay(1000L)
println("childJob1 end")
}
childJob2 = launch {
println("childJob2 start")
delay(3000L)
println("childJob2 start")
}
childJob3 = launch {
println("childJob3 start")
delay(5000L)
println("childJob3 start")
}
}
delay(500L)
parentJob.cancel()
logX("Process end!")
}
//输出结果:
//childJob1 start
//childJob2 start
//childJob3 start
//================================
//Process end!
//Thread:main @coroutine#1
parentJob.cancel
调用后,每个子Job
只是输出了start,这就可以得出一个结论:父Job
取消后子Job
也会依次跟着取消。如果调用任何一个子Job
的cancel
则不会对父Job
和其他子Job
产生影响。
到这里对于开头的那句协程的优势在于结构化并发就有更更好的理解了,这是Kotlin协程的第二大优势。
4.launch和async的使用场景
launch: 主要用来发起一些不需要任何结果的耗时任务,这个任务在执行中可以改变它的执行状态。
async: 主要用来发起一些需要结果的耗时任务,以及与挂起函数结合,优化并发。
来源:https://juejin.cn/post/7173122172930490382


猜你喜欢
- 在spring boot里,很吸引人的一个特性是可以直接把应用打包成为一个jar/war,然后这个jar/war是可以直接启动的,而不需要另
- 使用Spring提供的CommandLineRunner接口可以实现了一个命令行应用程序。但是,参数/选项/参数处理却不是那么好。幸运的是,
- Java Function的使用一、方法介绍表示接受一个参数并产生结果的函数。参数类型 T - 函数输入的类型R - 函数的结果类型方法介绍
- PS:本文包含了大部分strings函数的说明,并附带举例说明。本来想自己整理一下的,发现已经有前辈整理过了,就转了过来。修改了原文一些源码
- 一.数组的三种声明方式总结public class WhatEver { public static void main(Str
- 具体代码如下所示:private string GetWeekName(DayOfWeek week) { &nb
- 本文实例讲述了Java实现储存对象并按对象某属性排序的几种方法。分享给大家供大家参考,具体如下:在编程的时候,经常会出现对某一种类的对象们按
- 我的电脑环境win10vscode 1.36.1vscode安装插件安装完这个插件后会提示你安装 platformIOCore,按照提示安装
- 这篇文章主要介绍了java实现上传文件类型检测过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋
- 1、分布式锁简介分布式锁是控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,
- 引言依照领导要求区分了两种压测模式:固定次数压测和固定时间压测。此前一直沿用的都是固定次数,所以本次第二版剥离了固定次数的模式增加了固定时间
- 题目描述:给定一 m*n 的矩阵,请按照逆时针螺旋顺序,返回矩阵中所有元素。示例:思路:这是一道典型的模拟问题:我们可以分析一下,遍历前进轨
- 设置session有效时间的三种方式session的默认有效时间是30分钟(min)方法一使用java函数:session.setMaxIn
- 想做一个上传图片的功能,来展示用户上传的图片。在返回给前端的URL上弄了好久,前端一直无法访问到URL,结果一直显示404。 倒腾了一上午发
- 代码中已经加入了注释,需要的朋友可以直接参考代码中的注释。下面直接上功能实现的主要代码:import java.io.File;import
- 1. 配置 * 具体步骤:编写一自定义 * 类实现接口 HandlerInterceptorHandlerInterceptor 接口: 可
- 导航和路由Flutter提供了一个完整的用于在屏幕之间导航和处理深层链接的系统。没有复杂深度链接的小型应用程序可以使用Navigator,而
- 本文实例讲述了C#直线的最小二乘法线性回归运算方法。分享给大家供大家参考。具体如下:1.Point结构在编写C#窗体应用程序时,因为引用了S
- for循环语句重复执行语句,直到条件变为 false。语法for ( init-expression ; cond-expression ;
- 本文实例为大家分享了C#异步调用的具体代码,供大家参考,具体内容如下using System;using System.Collection