Kotlin协程低级api startCoroutine与ContinuationInterceptor
作者:Pika 发布时间:2022-05-04 21:45:50
聊一聊kotlin协程“低级”api
Kotlin协程已经出来很久了,相信大家都有不同程度的用上了,由于最近处理的需求有遇到协程相关,因此今天来聊一Kotlin协程的“低级”api,首先低级api并不是它真的很“低级”,而是kotlin协程库中的基础api,我们一般开发用的,其实都是通过低级api进行封装的高级函数,本章会通过低级api的组合,实现一个自定义的async await 函数(下文也会介绍kotlin 高级api的async await),涉及的低级api有startCoroutine ,ContinuationInterceptor 等
startCoroutine
我们知道,一个suspend关键字修饰的函数,只能在协程体中执行,伴随着suspend 关键字,kotlin coroutine common库(平台无关)也提供出来一个api,用于直接通过suspend 修饰的函数直接启动一个协程,它就是startCoroutine
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
作为Receiver
receiver: R,
当前协程结束时的回调
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
可以看到,它的Receiver是(suspend R.() -> T),即是一个suspend修饰的函数,那么这个有什么作用呢?我们知道,在普通函数中无法调起suspend函数(因为普通函数没有隐含的Continuation对象,这里我们不在这章讲,可以参考kotlin协程的资料)
但是普通函数是可以调起一个以suspend函数作为Receiver的函数(本质也是一个普通函数)
其中startCoroutine就是其中一个,本质就是我们直接从外部提供了一个Continuation,同时调用了resume方法,去进入到了协程的世界
startCoroutine实现
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
这个原理我们就不细讲下去原理,之前也有写过相关的文章。通过这种调用,我们其实就可以实现在普通的函数环境,开启一个协程环境(即带有了Continuation),进而调用其他的suspend函数。
ContinuationInterceptor
我们都知道 * 的概念,那么kotlin协程也有,就是ContinuationInterceptor,它提供以AOP的方式,让外部在resume(协程恢复)前后进行自定义的拦截操作,比如高级api中的Diapatcher就是。当然什么是resume协程恢复呢,可能读者有点懵,我们还是以上图中出现的mySuspendFunc举例子
mySuspendFunc是一个suspned函数
::mySuspendFunc.startCoroutine(object : Continuation<Unit> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
}
})
它其实等价于
val continuation = ::mySuspendFunc.createCoroutine(object :Continuation<Unit>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
Log.e("hello","当前协程执行完成的回调")
}
})
continuation.resume(Unit)
startCoroutine方法就相当于创建了一个Continuation对象,并调用了resume。创建Continuation可通过createCoroutine方法,返回一个Continuation,如果我们不调用resume方法,那么它其实什么也不会执行,只有调用了resume等执行方法之后,才会执行到后续的协程体(这个也是协程内部实现,感兴趣可以看看之前文章)
而我们的 * ,就相当于在continuation.resume前后,可以添加自己的逻辑。我们可以通过继承ContinuationInterceptor,实现自己的 * 逻辑,其中需要复写的方法是interceptContinuation方法,用于返回一个自己定义的Continuation对象,而我们可以在这个Continuation的resumeWith方法里面(当调用了resume之后,会执行到resumeWith方法),进行前后打印/其他自定义操作(比如切换线程)
class ClassInterceptor() :ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>):Continuation<T> by continuation{
override fun resumeWith(result: Result<T>) {
Log.e("hello","MyContinuation start ${result.getOrThrow()}")
continuation.resumeWith(result)
Log.e("hello","MyContinuation end ")
}
}
其中的key是ContinuationInterceptor,协程内部会在每次协程恢复的时候,通过coroutineContext取出key为ContinuationInterceptor的 * ,进行拦截调用,当然这也是kotlin协程内部实现,这里简单提一下。
实战
kotlin协程api中的 async await
我们来看一下kotlin Coroutine 的高级api async await用法
CoroutineScope(Dispatchers.Main).launch {
val block = async(Dispatchers.IO) {
// 阻塞的事项
}
// 处理其他主线程的事务
// 此时必须需要async的结果时,则可通过await()进行获取
val result = block.await()
}
我们可以通过async方法,在其他线程中处理其他阻塞事务,当主线程必须要用async的结果的时候,就可以通过await等待,这里如果结果返回了,则直接获取值,否则就等待async执行完成。这是Coroutine提供给我们的高级api,能够将任务简单分层而不需要过多的回调处理。
通过startCoroutine与ContinuationInterceptor实现自定义的 async await
我们可以参考其他语言的async,或者Dart的异步方法调用,都有类似这种方式进行线程调用
async {
val result = await {
suspend 函数
}
消费result
}
await在async作用域里面,同时获取到result后再进行消费,async可以直接在普通函数调用,而不需要在协程体内,下面我们来实现一下这个做法。
首先我们想要限定await函数只能在async的作用域才能使用,那么首先我们就要定义出来一个Receiver,我们可以在Receiver里面定义出自己想要暴露的方法
interface AsyncScope {
fun myFunc(){
}
}
fun async(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend AsyncScope.() -> Unit
) {
// 这个有两个作用 1.充当receiver 2.completion,接收回调
val completion = AsyncStub(context)
block.startCoroutine(completion, completion)
}
注意这个类,resumeWith 只会跟startCoroutine的这个协程绑定关系,跟await的协程没有关系
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
Continuation<Unit>, AsyncScope {
override fun resumeWith(result: Result<Unit>) {
// 这个是干嘛的 == > 完成的回调
Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
}
}
上面我们定义出来一个async函数,同时定义出来了一个AsyncStub的类,它有两个用处,第一个是为了充当Receiver,用于规范后续的await函数只能在这个Receiver作用域中调用,第二个作用是startCoroutine函数必须要传入一个参数completion,是为了收到当前协程结束的回调resumeWith中可以得到当前协程体结束回调的信息
await方法里面
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
// 自定义的Receiver函数
myFunc()
Thread{
切换线程执行await中的方法
it.resumeWith(Result.success(block()))
}.start()
}
在await中,其实是一个扩展函数,我们可以调用任何在AsyncScope中定义的方法,同时这里我们模拟了一下线程切换的操作(Dispatcher的实现,这里不采用Dispatcher就是想让大家知道其实Dispatcher.IO也是这样实现的),在子线程中调用it.resumeWith(Result.success(block())),用于返回所需要的信息
通过上面定的方法,我们可以实现
async {
val result = await {
suspend 函数
}
消费result
}
public interface ContinuationInterceptor : CoroutineContext.Element
//而CoroutineContext.Element又是继承于CoroutineContext
CoroutineContext.Element:CoroutineContext
而我们的 * ,正是CoroutineContext的子类,我们把上文的ClassInterceptor修改一下
class ClassInterceptor() : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
Continuation<T> by continuation {
private val handler = Handler(Looper.getMainLooper())
override fun resumeWith(result: Result<T>) {
Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
handler.post {
continuation.resumeWith(Result.success(自定义内容))
}
Log.e("hello", "MyContinuation end ")
}
}
同时把async默认参数CoroutineContext实现一下即可
fun async(
context: CoroutineContext = ClassInterceptor(),
block: suspend AsyncScope.() -> Unit
) {
// 这个有两个作用 1.充当receiver 2.completion,接收回调
val completion = AsyncStub(context)
block.startCoroutine(completion, completion)
}
此后我们就可以直接通过,完美实现了一个类js协程的调用,同时具备了自动切换线程的能力
async {
val result = await {
test()
}
Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}")
}
结果
E start
E MyContinuation start kotlin.Unit
E MyContinuation end
E end
E 执行阻塞函数 test 1923
E MyContinuation start 自定义内容数值
E MyContinuation end
E result is 自定义内容的数值 true
E AsyncStub resumeWith 2 kotlin.Unit
最后,这里需要注意的是,为什么 * 回调了两次,因为我们async的时候开启了一个协程,同时await的时候也开启了一个,因此是两个。AsyncStub只回调了一次,是因为AsyncStub被当作complete参数传入了async开启的协程block.startCoroutine,因此只是async中的协程结束才会被回调。
本章代码
class ClassInterceptor() : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
Continuation<T> by continuation {
private val handler = Handler(Looper.getMainLooper())
override fun resumeWith(result: Result<T>) {
Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
handler.post {
continuation.resumeWith(Result.success(6 as T))
}
Log.e("hello", "MyContinuation end ")
}
}
interface AsyncScope {
fun myFunc(){
}
}
fun async(
context: CoroutineContext = ClassInterceptor(),
block: suspend AsyncScope.() -> Unit
) {
// 这个有两个作用 1.充当receiver 2.completion,接收回调
val completion = AsyncStub(context)
block.startCoroutine(completion, completion)
}
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
Continuation<Unit>, AsyncScope {
override fun resumeWith(result: Result<Unit>) {
// 这个是干嘛的 == > 完成的回调
Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
}
}
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
myFunc()
Thread{
it.resumeWith(Result.success(block()))
}.start()
}
模拟阻塞
fun test(): Int {
Thread.sleep(5000)
Log.e("hello", "执行阻塞函数 test ${Thread.currentThread().id}")
return 5
}
async {
val result = await {
test()
}
Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}")
}
最后
我们通过协程的低级api,实现了一个与官方库不同版本的async await,同时也希望通过对低级api的设计,也能对Coroutine官方库的高级api的实现有一定的了解。
来源:https://juejin.cn/post/7172813333148958728


猜你喜欢
- 抽象类和抽象方法常用知识点:(1)抽象类作为被继承类,子类必须实现抽象类中的所有抽象方法,除非子类也为抽象类。也就是说,如果子类也为抽象类,
- 表关联上一篇介绍了JPA的简单使用,这一篇介绍JPA在表关联上的使用一对一配置参数JPA对于数据实体一对一映射使用的是@OneToOne注解
- 前言:上午写代码时还好好的,下午不知道怎么回事突然就不显示logcat日志了,觉得很奇怪,于是开始找各种解决办法!现象如图所示,logcat
- 背景先上图由此可见,非自旋锁如果拿不到锁会把线程阻塞,直到被唤醒;自旋锁拿不到锁会一直尝试为什么要这样?好处阻塞和唤醒线程都是需要高昂的开销
- 先看下面的这组字符,如果输出来,它是无法靠右对齐: Source Codestring[] s1 = { "300",
- Mybatis获取参数值的两种方式:${},#{}${}本质:字符串拼接,注意:单引号要加上#{}:本质:占位符赋值一、 Mybatis获取
- 底座的状态跟充电状态类似,很多底座提供充电功能(座充).底座状态同样使用sticky Intent广播。可以查询设备是否插入底座,哪种底座。
- 目录引言什么是Span关于String的一段性能提升测试代码最终性能对比写在最后引言C# 是一门现代化的编程语言,与Java十分的相似。熟练
- 在页面提交到tomcat乱码 解决方法是在tomcat/conf/server.xml中进行配置以tomcat6.0.32为例,需将以下代码
- 本文实例为大家分享了C#实现单位换算器的具体代码,供大家参考,具体内容如下一、阐述进制间转换:十六进制、十进制、八进制、二进制。长度间转换:
- 前言在使用IDEA2020.2.3版本时,创建web工程遇到了一些问题,经过一番摸索之后得到解决方案。一、新建javaweb工程1.先创建一
- 概述在早期的 Java 版本中,文件 IO 操作功能一直相对较弱,主要存在以下问题:缺乏对现代文件系统的支持:只提供的基础的文件操作,不支持
- 1.android中利用webview调用网页上的js代码。Android 中可以通过webview来实现和js的交互,在程序中调用js代码
- 在传统的单服务架构中,一般来说,只有一个服务器,那么不存在 Session共享问题,但是在分布式/集群项目中,Session 共享则是一个必
- 本文实例为大家分享了Android下载进度监听和通知的具体代码,供大家参考,具体内容如下下载管理器关于下载进度的监听,这个比较简单,以apk
- 今天带大家实现滑动返回效果.,具体内容如下所示:先看看效果图:因为没有具体内容,也没有简书的图片资源,所以稍微简陋了点.但是依然不妨碍我们的
- 1、本篇内容本文让大家掌握 springmvc 中异步处理请求,特别牛逼的一个功能,大家一定要掌握。2、看段代码,分析问题@Response
- 先给出网页地址:https://wall.alphacoders.com/featured.php?lang=Chinese主要步骤:利用J
- 一、场景Android项目的开发过程中,我们项目中的gradle的dependencies闭包中会引入很多三方依赖库,引入的库越多,越容易产
- 本文实例为大家分享了java实现文件归档和还原的具体代码,供大家参考,具体内容如下基本思路: 文件归档,换句话就是把多个文件的字节