Kotlin协程Channel特点及使用细节详解
作者:无糖可乐爱好者 发布时间:2021-06-03 08:12:34
在协程启动模式中已经知道async
是可以返回结果的,但是只返回一个,那么在复杂场景下就会不够用了,所以Channel
就出现了。
1.认识Channel
Channel
的意思是管道、通道,用图表示如下:
Channel
的左边是发送方,右边是接收方,中间则是消息,那么代码表示就是下面这样:
fun main() {
channelTest()
}
fun channelTest() = runBlocking {
val channel = Channel<Int>() //关键点①
launch {
for (i in 1..3) {
channel.send(i) //关键点②
logX("send: $i")
}
}
launch {
for (i in channel) { //关键点③
logX("receiver: $i")
}
}
logX("end")
}
//输出结果:
//================================
//end
//Thread:main @coroutine#1
//================================
//================================
//receiver: 1
//Thread:main @coroutine#3
//================================
//================================
//send: 1
//Thread:main @coroutine#2
//================================
//================================
//send: 2
//Thread:main @coroutine#2
//================================
//================================
//receiver: 2
//Thread:main @coroutine#3
//================================
//================================
//receiver: 3
//Thread:main @coroutine#3
//================================
//================================
//send: 3
//Thread:main @coroutine#2
//================================
上面的代码中启动了两个协程,一个发送,一个接收,还有几个关键点:
关键点①:通过
Channel
创建一个管道,其中泛型Int
表示发送的数据类型;关键点②:启动一个协程通过
send
发送数据,send
是一个挂起函数;关键点③:启动一个协程遍历
channel
打印出接收到的消息。
那么这里还有一个问题,在执行完上述代码后程序并没有终止,那要如何终止程序呢?
很简单,在发送完所有消息后调用close
方法即可。
launch {
for (i in 1..3) {
channel.send(i) //关键点②
logX("send: $i")
}
//修改点
// ↓
channel.close()
}
Channel
也是一种协程资源,用完后如果不关闭那么这个资源就会一直被占用。
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
...
}
CONFLATED -> {
...
}
UNLIMITED -> {
...
}
BUFFERED -> {
...
}
else -> {
...
}
}
Channel
中有三个参数:
capacity
: 代表管道的容量,默认值为RENDEZVOUS
,代表容量为0,除此之外还有三个类型:
CONFLATED:代表容量为1,新的数据会替代旧的数据;
UNLIMITED:代表无限容量;
BUFFERED:代表具备一定缓存的容量,默认情况下是64,具体容量由VM参数
kotlinx.coroutines.channels.defaultBuffer
决定。onBufferOverflow
: 代表缓冲策略,也就是当缓冲的容量满了之后要怎么做。默认值为SUSPEND
,表示在缓冲区溢出时挂起。除此之外还有两个类型:
DROP_OLDEST:在缓冲区溢出时删除最旧的值,向缓冲区添加新值,不要挂起;
DROP_LATEST:在缓冲区溢出时,立即删除正在添加到缓冲区的最新值(以便缓冲区内容保持不变),不要挂起。
onUndeliveredElement
: 它相当于一个异常处理回调。当管道中的某些数据没有被成功接收的时候,这个回调就会被调用
现在写个案例看一下capacity
在其他类型下的区别
/**
* Channel.CONFLATED
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 4
/**
* Channel.UNLIMITED
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
/**
* Channel.BUFFERED
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(Channel.BUFFERED)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
再看一下onBufferOverflow
在其他类型下的区别
/**
* capacity = 3,onBufferOverflow = BufferOverflow.DROP_OLDEST
* 缓冲区设置为3,缓冲区溢出时删除最旧的值,向缓冲区添加新值
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 2
//receiver: 3
//receiver: 4
/**
* capacity = 3,onBufferOverflow = BufferOverflow.DROP_LATEST
* 缓冲区设置为3,缓冲区溢出时立即删除正在添加到缓冲区的最新值
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_LATEST
)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//输出结果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
再看一下onUndeliveredElement
要如何使用
/**
* capacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement
* 缓冲区设置为2,缓冲区溢出时立即删除正在添加到缓冲区的最新值
* 接收一个数据后取消接收其他数据
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_LATEST,
onUndeliveredElement = {
println("onUndeliveredElement: $it")
}
)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
}
println("receive:${channel.receive()}")
channel.cancel()
}
//输出结果:
//send: 1
//send: 2
//send: 3
//send: 4
//receive:1
//onUndeliveredElement: 2
//onUndeliveredElement: 3
上面的代码容量设置为2,缓冲策略是删除正在添加到缓冲区的最新值,接收一个数据后立即取消接收其他数据,也就是说接收到了【send: 1】的数据【receive:1】,【send: 4】的数据被缓冲策略删除了,由于接收消息的同道已经被取消了那么【send: 2】和【send: 3】的数据就只能在异常中被处理,从输出结果就可以看到。
从上面的代码示例可以总结出它的应用场景:接收方很关心数据是否被消费,例如企业微信、钉钉的消息是否已读的状态,对于异常处理那块的场景就像是发送消息过程中消息没有被发送出去,那么接收方就无法接受到这个消息。
2.Channel使用中的细节
前面在使用Channel
时为了让程序终止在发送完成后调用了channel.close()
,但是这个很容易被忘记,忘记添加就会造成程序无法终止的问题,那么Produce
就诞生了,它是一个高阶函数。
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
for (i in 1..4) {
send(i)
}
}
launch {
for (i in channel) {
println("receive: $i")
}
}
println("end")
}
//输出结果:
//end
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//Process finished with exit code 0
可以看到没有加入close
代码就可以正常结束,上面发送了4条数据,那么我要是接收5条数据会不会有什么问题?
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
for (i in 1..4) {
send(i)
}
}
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("end")
}
//输出结果:
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//ClosedReceiveChannelException: Channel was closed
可以看到当我接收第5条数据的时候报出channel
被关闭的提示,也就是说produce
确实会在消息发送完毕后关闭通道。
业务开发中有可能我们确实需要对channel
发送的消息进行单独处理,那么也许并不知道具体发送了几条数据,如果接收数据数量超过发送数据数量就会出现错误,那有没有像isClose
这类的方法可以在接收前判断是否被关闭呢?有的,在Channel
中还有两个变量:
//如果该通道已通过调用[close]关闭,则返回' true '。这意味着调用[send]将导致异常。
public val isClosedForSend: Boolean
//如果通过在SendChannel端调用close关闭了此通道,
//并且已经接收到以前发送的所有项目,则返回true。
public val isClosedForReceive: Boolean
那么安全的调用channel.receive()
接收就可以这么写
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce(capacity = 3) {
(1..4).forEach {
send(it)
println("Send $it")
}
}
while (!channel.isClosedForReceive) {
println("receive: ${channel.receive()}")
}
println("end")
}
//输出结果:
//Send 1
//Send 2
//Send 3
//Send 4
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//end
但是这里会有一个问题,不定义capacity
的数量
fun produceTest() = runBlocking {
//变化在这里
//↓
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach {
send(it)
println("Send $it")
}
}
while (!channel.isClosedForReceive) {
println("receive: ${channel.receive()}")
}
println("end")
}
//输出结果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//
//ClosedReceiveChannelException: Channel was closed
可以看到send
发送的数据全部都被接收了,但是还是报出channel
被关闭的错误,原因在注释中已经写明:如果通过在SendChannel端调用close关闭了此通道,并且已经接收到以前发送的所有项目,则返回true。
这意味着调用receive将导致closereceivechannelexception。 所以channel.receive()
要慎用。可以用channel.consumeEach
代替
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach {
send(it)
println("Send $it")
}
}
//变化在这里
channel.consumeEach {
println("receive: $it")
}
println("end")
}
//输出结果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//end
3.Channe的特点
Channel
主要你用来传递数据流的,这个数据流指的是多个数据组合形成别的流,与它形成鲜明对比的是async
、挂起函数。
数据流的传输,有发送就有接收,而Channel
是完全符合这一点的。发送与接收存在两种情况:
数据流的发送了但是还没有被接收,没有接收则不再进行发送消息,例如文件的传输;
数据流的发送了不管有没有被接收,都要继续发送消息,例如微信聊天。
Channel
符合第二个结论,无论发送的数据是否被消费或者说被接收,Channel
都会进行工作。我们来证明一下这个结论。
/**
* 消息容量为10,发送4条数据
* 无论消息是否被接收都会吧消息发送完毕
*/
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce(capacity = 10) {
(1..4).forEach {
send(it)
println("Send $it")
}
}
println("end")
}
//输出结果:
//end
//Send 1
//Send 2
//Send 3
//Send 4
/**
* 消息容量改为默认,默认值时0,发送4条数据
* Channel依旧是在工作的,只是说在调用send方法的时候
* 接收方还没有准备完毕且容量为0,所以会被挂起,程序一直无法退出
*/
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach {
send(it)
println("Send $it")
}
}
println("end")
}
//输出结果:
//end
//程序没有结束
通过上面的代码引出一个结论:Channel
是“热” 的。不管接收方是否存在,Channel
是一定会工作的。类似于自来水厂向像居民提供水源,发电厂向居民提供电能。
来源:https://juejin.cn/post/7173833391165407268


猜你喜欢
- springBoot集成Elasticsearch 报错 Health check failed今天集成Elasticsearch 时启动报
- 并发与并行并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点
- spring batch简介spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环
- 一、select是什么select——>用于选择更快的结果。基于场景理解比如客户端要查询一个商
- 本文实例为大家分享了Android自定义Banner轮播效果展示的具体代码,供大家参考,具体内容如下自定义View布局<Relativ
- 安装JDK 向导进行相关参数设置。如图: 正在安装程序的相关功能,如图: 选择安装的路径,可以自定义,也可以默认路径。如图: 成功安装之
- 本文实例讲述了Java实现的最大匹配分词算法。分享给大家供大家参考,具体如下:全文检索有两个重要的过程:1分词2倒排索引我们先看分词算法目前
- 在学习使用Scroller之前,需要明白scrollTo()、scrollBy()方法。一、View的scrollTo()、scrollBy
- (鼠标放上去将一直显示,移开动画继续),提供normal和error两种边框。介绍:传统的确定,取消,OK,CANCAL之类的对话框太繁琐了
- 一、什么是反射机制 简单的来说,反射机制指的是程序在运行时能够获取自身的信息。在java中,只要给定类的名字,那么就可以通过反射机制来获得类
- 目标本文提供一种自定义注解,来实现业务审批操作的DEMO,不包含审批流程的配置功能。具体方案是自定义一个Aspect注解,拦截sevice方
- resultType和resultMap只能有一个成立,resultType是直接表示返回类型的,而resultMap则是对外部Result
- 修改\packages\apps\Camera\res\values\arrays.xml中的以下代码: <string-array
- 声明式事务很方便,尤其纯注解模式,仅仅几个注解就能控制事务了思考:这些注解都做了什么?好神奇!@EnableTransactionManag
- 本文实例讲述了获取Android系统唯一识别码的方法。分享给大家供大家参考。具体如下:在计算机上,我们习惯用MAC地址来标志一台计算机。在A
- 自定义控件的步骤:1 View的工作原理2 编写View类3 为View类增加属性4 绘制屏幕5 响应用户消息6 自定义回调函数java代码
- 在没讲.net如何随机生成汉字之前先给大家讲下汉字编码组成及原理。1、汉字编码原理到底怎么办到随机生成汉字的呢?汉字从哪里来的呢?是不是有个
- 手把手教你用C#开发Android应用程序的方法和流程摘要:用C#能开发RFID-android吗?C#真的能开发android程序吗?C#
- 一、IDEA下载idea、jdk、tomcat、maven下载地址请参考上一篇博客:https://blog.csdn.net/DwZ735
- 本文实例为大家分享了Flutter实现微信朋友圈功能的具体代码,供大家参考,具体内容如下今天给大家实现一下微信朋友圈的效果,下面是效果图下面