软件编程
位置:首页>> 软件编程>> Android编程>> Kotlin协程操作之创建启动挂起恢复详解

Kotlin协程操作之创建启动挂起恢复详解

作者:LeeDuo.  发布时间:2022-07-01 09:25:25 

标签:Kotlin,协程,创建,启动,挂起,恢复

下面以launch方法为例进行分析。

一.协程的创建

launch方法的代码如下:

// CoroutineScope的扩展方法
public fun CoroutineScope.launch(
   context: CoroutineContext = EmptyCoroutineContext,
   start: CoroutineStart = CoroutineStart.DEFAULT,
   block: suspend CoroutineScope.() -> Unit
): Job {
   // 根据当前上下文,计算得到新的上下文
   val newContext = newCoroutineContext(context)
   // 根据启动模式,创建不同的续体
   val coroutine = if (start.isLazy)
       LazyStandaloneCoroutine(newContext, block) else
       StandaloneCoroutine(newContext, active = true)
   // 启动协程
   coroutine.start(start, coroutine, block)
   return coroutine
}

newCoroutineContext用于计算新的上下文,代码如下:

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
   // coroutineContext为CoroutineScope中保存的全局变量
   // 对上下文进行相加
   val combined = coroutineContext + context
   // 用于debug
   val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
   // 如果上下文中没有调度器,则添加一个默认的调度器
   return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
       debug + Dispatchers.Default else debug
}

1.start方法

在不指定协程启动模式的情况下,协程将按照DEFAULT模式启动,在上述代码中,会调用StandaloneCoroutine对象的start方法。StandaloneCoroutine的代码如下:

private open class StandaloneCoroutine(
   parentContext: CoroutineContext,
   active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
   override fun handleJobException(exception: Throwable): Boolean {
       handleCoroutineException(context, exception)
       return true
   }
}

StandaloneCoroutine类中仅重写了handleJobException方法,用于处理父协程不处理的异常。因此这里调用的start方法实际是父类AbstractCoroutine的方法,AbstractCoroutine类的start方法代码如下:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
   // 该方法用于完成父协程与子协程的绑定关联,同时确保父协程启动
   initParentJob()
   // 该方法的写法等同于start.invoke(block, receiver, this)
   // 因此调用的CoroutineStart类的方法
   start(block, receiver, this)
}

AbstractCoroutine类的start方法内,调用了CoroutineStart类的invoke方法。

2.CoroutineStart类

CoroutineStart是一个枚举类,用于根据不同的启动模式去启动协程,代码如下:

public enum class CoroutineStart {
   // 四种启动模式
   DEFAULT,
   LAZY,
   // 具有实验性,慎用
   @ExperimentalCoroutinesApi
    * IC,
   // 具有实验性,慎用
   @ExperimentalCoroutinesApi
   UNDISPATCHED;
   // 根据不同的启动策略,启动协程,执行block
   @InternalCoroutinesApi
   public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
       when (this) {
           DEFAULT -> block.startCoroutineCancellable(completion)
            * IC -> block.startCoroutine(completion)
           UNDISPATCHED -> block.startCoroutineUndispatched(completion)
           LAZY -> Unit // 该模式不主动启动,等待用户调用start方法
       }
   // 根据不同的启动策略,启动协程,执行block
   @InternalCoroutinesApi
   public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
       when (this) {
           DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            * IC -> block.startCoroutine(receiver, completion)
           UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
           LAZY -> Unit
       }
   // 当前的启动模式是否为懒启动
   @InternalCoroutinesApi
   public val isLazy: Boolean get() = this === LAZY
}

CoroutineStart类中有两个invoke方法,其中一个参数中有receiver,另一个没有receiver。在Kotlin协程中,很多方法都重载了带有receiver的方法和不带有receiver的方法。

receiver用于为block执行提供一个环境。Kotlin中提供的启动协程的方法都是通过带receiver参数的start方法实现。通过receiver环境,可以更方便的实现一些操作,比如在launch启动的协程中再次调用launch启动新的协程。在没有receiver的环境下执行block,则更像是在suspend方法中执行,如果需要启动其他的协程,需要自己提供环境。

3.startCoroutineCancellable方法

startCoroutineCancellable是一个扩展方法,用来创建一个可以取消的协程,代码如下:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
   runSafely(completion) {
       // createCoroutineUnintercepted:创建协程
       // intercepted:拦截调度
       // resumeCancellableWith:恢复执行
       createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
   }
// 如果创建的过程发生异常,则通知续体恢复后续代码的执行
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
   try {
       block()
   } catch (e: Throwable) {
       completion.resumeWith(Result.failure(e))
   }
}

4.createCoroutineUnintercepted方法

createCoroutineUnintercepted方法用于创建一个新的、可挂起的、不受干扰的协程。

public expect fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
   receiver: R,
   completion: Continuation<T>
): Continuation<Unit>

在Kotlin中有很多被expect关键字标记的接口方法,需要找到对应平台下被actual标记的实现方法。

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
   receiver: R,
   completion: Continuation<T>
): Continuation<Unit> {
   // 用于debug
   val probeCompletion = probeCoroutineCreated(completion)
   return if (this is BaseContinuationImpl)
       create(receiver, probeCompletion)
   else {
       createCoroutineFromSuspendFunction(probeCompletion) {
           (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
       }
   }
}

createCoroutineUnintercepted方法创建的协程需要手动调用resumeWith方法才可以启动,但重复的调用resumeWith方法可能会导致状态机发生异常。同时,参数中传入的completion可能会在任意的上下文中被调用。

正常情况下,我们编写的lambda表达式&mdash;&mdash;block,在编译器编译时,会自动生成一个类,并继承SuspendLambda类,实现Continuation等接口。因为SuspendLambda继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,所以才有了上述代码中的判断逻辑。

如果当前的block对象的类型为BaseContinuationImpl,则调用create方法,这里的create方法是编译器生成的类里的重写方法,它的内部就是通过我们传入的参数,创建并返回根据blcok生成的类的一个实例对象。

如果当前的block对象的类型不为BaseContinuationImpl,则需要通过createCoroutineFromSuspendFunction方法创建协程。这里假设lambda表达式的类型不是BaseContinuationImpl。

5.createCoroutineFromSuspendFunction方法

该方法用于在createCoroutineUnintercepted方法中使用,当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。

有两种情况会调用该方法创建协程:第一种情况是lambda表达式中调用了其他的挂起方法;第二种情况是挂起方法是通过Java实现的。

createCoroutineFromSuspendFunction方法的代码如下:

private inline fun <T> createCoroutineFromSuspendFunction(
   completion: Continuation<T>,
   crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
   val context = completion.context
   // 如果上下文为空
   return if (context === EmptyCoroutineContext)
       // 创建一个受限协程
       object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
           private var label = 0

override fun invokeSuspend(result: Result<Any?>): Any? =
               when (label) {
                   0 -> {
                       label = 1
                       result.getOrThrow()
                       block(this)
                   }
                   1 -> {
                       label = 2
                       result.getOrThrow()
                   }
                   else -> error("This coroutine had already completed")
               }
       }
   else // 不为空,则创建一个正常的协程
       object : ContinuationImpl(completion as Continuation<Any?>, context) {
           private var label = 0

override fun invokeSuspend(result: Result<Any?>): Any? =
               when (label) {
                   0 -> {
                       label = 1
                       result.getOrThrow()
                       block(this)
                   }
                   1 -> {
                       label = 2
                       result.getOrThrow()
                   }
                   else -> error("This coroutine had already completed")
               }
       }
}

受限协程是指协程在运行过程中的,只能调用协程作用域中提供的挂起方法发生挂起,其他挂起方法不能调用,因为在挂起方法会对续体进行拦截,可能导致后续代码的执行变得无法预测。

典型的例子就是sequence方法,它创建的协程就是受限协程,只能通过调用yield方法或者yieldAll方法才能发生挂起。由于受限协程中不能进行协程调度,因此其上下文是空的。

这里launch方法的上下文有一个默认调度器,因此会创建一个ContinuationImpl对象。

到这里,协程完成了创建。

二.协程的启动

再次回到startCoroutineCancellable方法,当调用createCoroutineUnintercepted创建好协程后,会调用intercepted方法,代码如下:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
   (this as? ContinuationImpl)?.intercepted() ?: this

intercepted方法是Continuation接口的扩展方法,内部调用了ContinuationImpl类的intercepted方法。

1.ContinuationImpl类

internal abstract class ContinuationImpl(
   completion: Continuation<Any?>?,
   private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
   constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
   public override val context: CoroutineContext
       get() = _context!!
   @Transient
   private var intercepted: Continuation<Any?>? = null
   // 如果没有缓存,则从上下文中获取 * ,调用interceptContinuation进行拦截,
   // 将拦截的续体保存到全局变量
   public fun intercepted(): Continuation<Any?> =
       intercepted
           ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
               .also { intercepted = it }
   protected override fun releaseIntercepted() {
       val intercepted = intercepted
       if (intercepted != null && intercepted !== this) {
           context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
       }
       this.intercepted = CompletedContinuation // just in case
   }
}

这里的ContinuationInterceptor指的就是在newCoroutineContext方法中传入的Dispatchers.Default调度器。CoroutineDispatcher类的interceptContinuation方法的代码如下:

public abstract class CoroutineDispatcher :
   AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
   // 将续体包裹成DispatchedContinuation,并传入当前调度器
   public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
       DispatchedContinuation(this, continuation)
   ...
}

2.resumeCancellableWith方法

再次回到startCoroutineCancellable方法,当调用intercepted方法进行拦截后,会调用resumeCancellableWith方法,代码如下:

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
   is DispatchedContinuation -> resumeCancellableWith(result)
   else -> resumeWith(result)
}

由于当前的Continuation对象的类型为DispatchedContinuation,因此调用DispatchedContinuation类的resumeCancellableWith方法,代码如下:

internal class DispatchedContinuation<in T>(
   @JvmField val dispatcher: CoroutineDispatcher,
   @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ * IC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
   ...
   @Suppress("NOTHING_TO_INLINE")
   inline fun resumeCancellableWith(result: Result<T>) {
       val state = result.toState()
       // 是否进行调度
       if (dispatcher.isDispatchNeeded(context)) {
           _state = state
           resumeMode = MODE_CANCELLABLE
           // 进行调度
           dispatcher.dispatch(context, this)
       } else {// Dispatcher.Unconfined调度器会走这里
           executeUnconfined(state, MODE_CANCELLABLE) {
               // 协程未被取消
               if (!resumeCancelled()) {
                   // 恢复执行
                   resumeUndispatchedWith(result)
               }
           }
       }
   }
   // 恢复执行前判断协程是否已经取消执行
   @Suppress("NOTHING_TO_INLINE")
   inline fun resumeCancelled(): Boolean {
       // 获取当前的协程任务
       val job = context[Job]
       // 如果不为空且不活跃
       if (job != null && !job.isActive) {
           // 抛出异常
           resumeWithException(job.getCancellationException())
           return true
       }
       return false
   }
   @Suppress("NOTHING_TO_INLINE")
   inline fun resumeUndispatchedWith(result: Result<T>) {
       // 该方法在指定的上下文中执行,在执行后同步协程上下文变化
       withCoroutineContext(context, countOrElement) {
           // 调用续体的resumeWith方法
           continuation.resumeWith(result)
       }
   }
   ...
}
// Dispatchers.Unconfined模式下的调度
private inline fun DispatchedContinuation<*>.executeUnconfined(
   contState: Any?, mode: Int, doYield: Boolean = false,
   block: () -> Unit
): Boolean {
   // 从ThreadLocal中获取EventLoop
   val eventLoop = ThreadLocalEventLoop.eventLoop
   // doYield表示是否正在让出执行
   // 如果正在让出执行,并且执行队列还是空的,说明不需要执行,返回false
   if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
   // 如果EventLoop当前还在被Unconfined调度器使用
   return if (eventLoop.isUnconfinedLoopActive) {
       _state = contState
       resumeMode = mode
       // 向队列中添加当前的任务
       eventLoop.dispatchUnconfined(this)
       // 返回 true
       true
   } else {
       // 重新运行EventLoop
       runUnconfinedEventLoop(eventLoop, block = block)
       // 返回false
       false
   }
}

runUnconfinedEventLoop方法是一个扩展方法,用于启动EventLoop,代码如下:

internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
   eventLoop: EventLoop,
   block: () -> Unit
) {
   // 引用计数+1
   eventLoop.incrementUseCount(unconfined = true)
   try {
       // 先执行当前的任务
       block()
       // 循环分发任务
       while (true) {
           // 全部执行完毕,则退出分发
           if (!eventLoop.processUnconfinedEvent()) break
       }
   } catch (e: Throwable) {
       handleFatalException(e, null)
   } finally {
       // 引用计数+1
       eventLoop.decrementUseCount(unconfined = true)
   }
}

Dispatchers.Default调度器与Dispatchers.Unconfined调度器的调度逻辑基本都相同,最终都是调用Contination对象的resumeWith方法,同时传入Result对象作为参数。

这里的Contination是createCoroutineUnintercepted方法创建的继承ContinuationImpl的匿名内部类对象。Result是resumeCancellableWith方法传入的Result.success(Unit)对象,因为首次启动,所以传入类型为Unit。

调用匿名内部类的resumeWith方法,实际调用的是父类BaseContinuationImpl的resumeWith方法。

3.BaseContinuationImpl类

BaseContinuationImpl类的resumeWith方法的代码如下:

internal abstract class BaseContinuationImpl(
   public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
   public final override fun resumeWith(result: Result<Any?>) {
       var current = this
       var param = result
       // 循环
       while (true) {
           // 用于debug
           probeCoroutineResumed(current)
           // current环境下
           with(current) {
               // completion用于续体执行完的回调,为空,则抛出异常
               // 这里的completion就是一开始创建的StandaloneCoroutine对象
               val completion = completion!!
               // 获取执行后的结果
               val outcome: Result<Any?> =
                   try {
                       // 核心执行
                       val outcome = invokeSuspend(param)
                       // 如果返回值为COROUTINE_SUSPENDED,说明协程挂起,退出循环
                       if (outcome === COROUTINE_SUSPENDED) return
                       // 返回结果成功
                       Result.success(outcome)
                   } catch (exception: Throwable) {
                       // 返回结果失败
                       Result.failure(exception)
                   }
               // 释放拦截的续体,状态机终止
               releaseIntercepted()
               // 这里没有直接调用resume,而是通过循环代替递归
               // 这也是resumeWith方法声明为final的原因
               if (completion is BaseContinuationImpl) {
                   // 这种情况一般为多个suspend方法按顺序执行
                   // 等待下一次循环
                   current = completion
                   param = outcome
               } else {
                   // 返回结果
                   completion.resumeWith(outcome)
                   return
               }
           }
       }
   }
   protected abstract fun invokeSuspend(result: Result<Any?>): Any?
   protected open fun releaseIntercepted() {
       // does nothing here, overridden in ContinuationImpl
   }
   public open fun create(completion: Continuation<*>): Continuation<Unit> {
       throw UnsupportedOperationException("create(Continuation) has not been overridden")
   }
   public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
       throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
   }
    ...
}

4.invokeSuspend方法

在上述代码中,resumeWith方法内部调用了invokeSuspend方法,这里的invokeSuspend方法实际就是createCoroutineFromSuspendFunction方法中创建的匿名内部类的invokeSuspend方法。匿名内部类的代码如下:

object : ContinuationImpl(completion as Continuation<Any?>, context) {
   // 初始状态
   private var label = 0
   override fun invokeSuspend(result: Result<Any?>): Any? =
           when (label) {
               0 -> {
                   label = 1
                   // 先去获取一次结果,如果有异常,则直接抛出,避免执行
                   // 比如在调度器中,如果发现协程已经取消,
                   // 则调用resumeWithException方法,在这里直接被抛出
                   result.getOrThrow()
                   // 把当前续体传入,执行协程
                   // 可能发生挂起
                   block(this)
               }
               1 -> {
                   // 如果协程发生了挂起,那么恢复挂起后会走到这里
                   label = 2
                   // 获取最终的执行结果
                   result.getOrThrow()
               }
               else -> error("This coroutine had already completed")
           }
}

三.协程的挂起与恢复

通过上述代码的分析,协程的挂起实际就是在协程返回结果时返回一个COROUTINE_SUSPENDED对象,在收到COROUTINE_SUSPENDED结果后直接返回,等待被再次调用resumeWith恢复。

COROUTINE_SUSPENDED对象定义在枚举类CoroutineSingletons中,代码如下:

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

该枚举类代表了协程的三个状态,协程在创建后状态为UNDECIDED,如果执行过程中发生挂起,则状态变为COROUTINE_SUSPENDED,最后挂起恢复后状态变为RESUMED。

而协程的恢复实际就是在挂起方法执行完成后,通过调用协程执行时传入的续体的resumeWith方法,恢复后续代码的执行。

来源:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126087741

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com