Kotlin协程与并发深入全面讲解
作者:且听真言 发布时间:2022-09-11 12:28:25
协程与并发
Kotlin协程是基于线程执行的。经过一层封装以后,Kotlin协程面对并发,处理方式与Java不同。
在java的世界里,并发往往是多个线程一起工作,存在共享的变量。需要处理好同步问题。要避免把协程与线程的概念混淆。
runBlocking {
var i = 0
launch(Dispatchers.Default) {
repeat(1000) {
i++
}
}
delay(1000L)
println(i)
}
Log
1000
Process finished with exit code 0
上述代码中没有任何并发任务,launch创建了一个协程,所有的计算都发生在协程中。所以不需要考虑同步问题。
1.协程并发问题
多个协程并发执行的例子:
runBlocking {
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
i++
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
9933
Process finished with exit code 0
上述代码中,创建了10个协程任务,每个协程任务都会工作在Default线程池中,这10个协程任务对i进行1000次自增操作,但是因为10个协程分别运行在不同的线程之前,且共享一个变量,所以会产生同步问题。
2.协程处理并发的手段
在Java中的同步手段有:synchronized、Atomic、Lock等;
使用@Synchronized注解或者synchronized(){}代码块
runBlocking {
var i = 0
val lock = Any()
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
synchronized(lock) {
i++
}
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
10000
Process finished with exit code 0
如何在上面的synchronized代码块中加入挂起函数,则发现会报错。
如下:
runBlocking {
suspend fun prepare() {
}
var i = 0
val lock = Any()
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
synchronized(lock) {
prepare()
i++
}
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
所以可以发现不能在synchronized{}当中调用挂起函数,编译器会报错。因为挂起函数会被翻译成带有Continuation的异步函数,造成synchronized代码块无法同步处理。
协程并发思路
单线程并发
在Kotlin协程中可以实现单线程并发。
runBlocking {
suspend fun getResult1(): String {
printlnCoroutine("Start getResult1")
delay(1000L)
printlnCoroutine("End getResult1")
return "Result1"
}
suspend fun getResult2(): String {
printlnCoroutine("Start getResult2")
delay(1000L)
printlnCoroutine("End getResult2")
return "Result2"
}
suspend fun getResult3(): String {
printlnCoroutine("Start getResult3")
delay(1000L)
printlnCoroutine("End getResult3")
return "Result3"
}
val results = mutableListOf<String>()
val time = measureTimeMillis {
val result1 = async {
getResult1()
}
val result2 = async {
getResult2()
}
val result3 = async {
getResult3()
}
results.add(result1.await())
results.add(result2.await())
results.add(result3.await())
}
println("Time:$time")
println(results)
}
fun printlnCoroutine(any: Any?) {
println("" + any + ";Thread:" + Thread.currentThread().name)
}
Log
Start getResult1;Thread:main @coroutine#2
Start getResult2;Thread:main @coroutine#3
Start getResult3;Thread:main @coroutine#4
End getResult1;Thread:main @coroutine#2
End getResult2;Thread:main @coroutine#3
End getResult3;Thread:main @coroutine#4
Time:1028
[Result1, Result2, Result3]
Process finished with exit code 0
上面代码启动了三个协程,它们之间是并发执行的,每个协程耗时1000ms,总耗时1000多毫秒,而且这几个协程都运行在main线程上。
所以 可以考虑将i++逻辑分发到单线程之上。
runBlocking {
val coroutineDispatcher = Executors.newSingleThreadExecutor {
Thread(it, "MySingleThread").apply {
isDaemon = true
}
}.asCoroutineDispatcher()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(coroutineDispatcher) {
repeat(1000) {
i++
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
10000
Process finished with exit code 0
上述代码把所有协程任务分发到单独的线程中执行,但这10个协程是并发执行的。
Mutex
在java中,Lock之类的同步锁是阻塞式的,而Kotlin提供了非阻塞式的锁:Mutex。
runBlocking {
val mutex = Mutex()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
mutex.lock()
i++
mutex.unlock()
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
Log
10000
Process finished with exit code 0
上述代码使用mutex.lock()、 mutex.unlock()包裹同步计算逻辑,实现多线程同步。Mutex 对比 JDK 当中的锁,最大的优势就在于支持挂起和恢复。
public interface Mutex {
public val isLocked: Boolean
public fun tryLock(owner: Any? = null): Boolean
public suspend fun lock(owner: Any? = null;
@Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " +
"For additional details please refer to #2794") // WARNING since 1.6.0
public val onLock: SelectClause2<Any?, Mutex>
public fun holdsLock(owner: Any): Boolean
public fun unlock(owner: Any? = null)
}
Mutex 是一个接口,它的 lock() 方法其实是一个挂起函数。而这就是实现非阻塞式同步锁的根本原因。
但是上述代码中对于 Mutex 的使用其实是错误的,会存在问题。如果代码在 mutex.lock()、mutex.unlock() 之间发生异常,从而导致 mutex.unlock() 无法被调用。这个时候,整个程序的执行流程就会一直卡住,无法结束。看下面代码:
runBlocking {
val mutex = Mutex()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
mutex.lock()
i++
i/0
mutex.unlock()
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
如何解决?使用mutex.withLock{}。
代码入下:
runBlocking {
val mutex = Mutex()
var i = 0
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
mutex.withLock {
i++
}
}
}
jobs.add(job)
}
jobs.joinAll()
println(i)
}
10000
Process finished with exit code 0
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
}
lock(owner)
try {
return action()
} finally {
unlock(owner)
}
}
withLock{} 的本质,其实是在 finally{} 当中调用了 unlock()。
Actor
Actor,它本质上是基于 Channel 管道消息实现的。
sealed class Msg
object AddMsg : Msg()
class ResultMsg(val result: CompletableDeferred<Int>) : Msg()
fun testCoroutinueConcurrent10() {
runBlocking {
suspend fun addActor() = actor<Msg> {
var counter = 0
for (msg in channel) {
when (msg) {
is AddMsg -> counter++
is ResultMsg -> msg.result.complete(counter)
}
}
}
val actor = addActor()
val jobs = mutableListOf<Job>()
repeat(10) {
val job = launch(Dispatchers.Default) {
repeat(1000) {
actor.send(AddMsg)
}
}
jobs.add(job)
}
jobs.joinAll()
val deferred = CompletableDeferred<Int>()
actor.send(ResultMsg(deferred))
val result = deferred.await()
actor.close()
println(result)
}
}
Log
10000
Process finished with exit code 0
addActor() 挂起函数,它其实调用了 actor() 这个高阶函数。而这个函数的返回值类型其实是 SendChannel。由此可见,Kotlin 当中的 Actor 其实就是 Channel 的简单封装。Actor 的多线程同步能力都源自于 Channel。这里,我们借助密封类定义了两种消息类型,AddMsg、ResultMsg,然后在 actor{} 内部,我们处理这两种消息类型,如果我们收到了 AddMsg,则计算“i++”;如果收到了 ResultMsg,则返回计算结果。而在 actor{} 的外部,我们则只需要发送 10000 次的 AddMsg 消息,最后再发送一次 ResultMsg,取回计算结果即可。Actor 本质上是基于 Channel 管道消息实现的。
避免共享可变状态
runBlocking {
val deferreds = mutableListOf<Deferred<Int>>()
repeat(10) {
val deferred = async(Dispatchers.Default) {
var i = 0
repeat(1000) {
i++
}
return@async i
}
deferreds.add(deferred)
}
var result = 0
deferreds.forEach {
result += it.await()
}
println(result)
}
Log
10000
Process finished with exit code 0
在每一个协程当中,都有一个局部的变量 i,同时将 launch 都改为了 async,让每一个协程都可以返回计算结果。这种方式,相当于将 10000 次计算,平均分配给了 10 个协程,让它们各自计算 1000 次。这样一来,每个协程都可以进行独立的计算,然后我们将 10 个协程的结果汇总起来,最后累加在一起。
runBlocking {
val result = (1..10).map {
async(Dispatchers.Default) {
var i = 0
repeat(1000) {
i++
}
return@async i
}
}.awaitAll()
.sum()
println(result)
}
Log
10000
Process finished with exit code 0
来源:https://blog.csdn.net/zhangying1994/article/details/127587338


猜你喜欢
- Spring框架提供了事务管理的标准实现,且可以通过注解或者XML文件的方式声明和配置事务。通过异步事件的方式解耦服务调用,可以提高程序的响
- 前言本文主要给大家介绍了关于利用Spring Data MongoDB持久化文档数据的相关内容,分享出来供大家参考学习,下面话不多说了,来一
- 字节流和字符流对于文件必然有读和写的操作,读和写就对应了输入和输出流,流又分成字节和字符流。1.从对文件的操作来讲,有读和写的操作——也就是
- 关于 Android 实现 iOS 上的滚轮选择效果的控件,到 github 上一搜一大堆,之所以还要造这个轮子,目的是为了更好的学习自定义
- 前言:在 Java 语言中,保证线程安全性的主要手段是加锁,而 Java 中的锁主要有两种:synchronized 和 Lock,我们今天
- 本文实例讲述了C#实现将浮点数表示的货币数量以汉字大写形式输出的方法。分享给大家供大家参考。具体如下:1.函数代码注:本段代码中能转化的最大
- 如果项目中要用到数据库,铁定要用到分页排序。之前在做数据库查询优化的时候,通宵写了以下代码,来拼接分页排序的SQL语句 /// <su
- Bezier曲线的形状是通过一组多边折线(特征多边形)的各顶点唯一地定义出来的。在这组顶点中:(1)只有第一个顶点和最后一个顶点在曲线上;(
- 一丶先引入上传下载的lib二丶上传的的servletpackage com.test.action;import java.io.File;
- 实现方案:我们直接参考实例代码:private String pattern = "((http|ftp
- 一:回顾(1)c++中的string类是在面试中和笔试中经常考的题目; 工程代码免费下载 string类的自行实现(2)c++中的strin
- 目录一、前言二、正文2.1 注解2.1.1 注解1:@Target({ElementType.TYPE})2.1.2 注解2:@Retent
- 数学工具类Math,供大家参考,具体内容如下1. 概述java.util.Math类是数学相关的工具类,里面提供了大量的静态方法,完成与数学
- 本文实例讲述了java获取百度网盘真实下载链接的方法。分享给大家供大家参考。具体如下:目前还存在一个问题,同一ip在获取3次以后会出现验证码
- 通过这篇文章通过实例代码向大家介绍了Spring实例化bean的几种方法,接下来看看具体内容吧。1.使用类构造器实现实例化(bean的自身构
- 本文实例讲述了C#编程实现QQ界面的方法。分享给大家供大家参考,具体如下:步骤:1.新建一个页面,假如说叫VerticalMenu2.把ht
- 开发中需要传递变参,考虑使用 dynamic 还是 Dictionary(准确地说是Dictionary<string,object&
- 一、代理是Java常用的设计模式,代理类通过调用被代理类的相关方法,并对相关方法进行增强。加入一些非业务性代码,比如事务、日志、报警发邮件等
- 1、利用延迟队列延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才
- 本文实例为大家分享了java桌球小游戏的具体代码,供大家参考,具体内容如下源码:import java.awt.*;import javax