Kotlin协程Channel源码示例浅析
作者:wayne214 发布时间:2023-06-14 22:54:08
结论先行
Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。
Channel使用示例
fun main() = runBlocking {
logX("开始")
val channel = Channel<Int> { }
launch {
(1..3).forEach{
channel.send(it)
logX("发送数据: $it")
}
// 关闭channel, 节省资源
channel.close()
}
launch {
for (i in channel){
logX("接收数据: $i")
}
}
logX("结束")
}
示例代码 使用Channel创建了一组int类型的数据流,通过send发送数据,并通过for循环取出channel中的数据,最后channel是一种协程资源,使用结束后应该及时调用close方法关闭,以免浪费不必要的资源。
Channel的源码
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {}
CONFLATED -> {}
UNLIMITED -> {}
else -> {}
}
可以看到Channel的构造函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.
首先看capacity,这个参数代表了管道的容量,默认参数是RENDEZVOUS,取值是0,还有其他一些值:
UNLIMITED: Int = Int.MAX_VALUE,没有限量
CONFLATED: 容量为1,新的覆盖旧的值
BUFFERED: 添加缓冲容量,默认值是64,可以通过修改VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修改
接下来看onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是suspend,一共有三种分别是: SUSPNED、DROP_OLDEST以及DROP_LATEST
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
SUSPEND,当管道的容量满了以后,如果发送方还要继续发送,我们就会挂起当前的 send() 方法。由于它是一个挂起函数,所以我们可以以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复,有点像生产者-消费者模型
DROP_OLDEST,顾名思义,就是丢弃最旧的那条数据,然后发送新的数据,有点像LRU算法。
DROP_LATEST,丢弃最新的那条数据。这里要注意,这个动作的含义是丢弃当前正准备发送的那条数据,而管道中的内容将维持不变。
最后一个参数是onUndeliveredElement,从名字看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接收时,这个就会被调用。
综合这个参数使用一下
fun main() = runBlocking {
println("开始")
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
println("onUndeliveredElement = $it")
}
launch {
(1..3).forEach{
channel.send(it)
println("发送数据: $it")
}
// 关闭channel, 节省资源
channel.close()
}
launch {
for (i in channel){
println("接收数据: $i")
}
}
println("结束")
}
输出结果如下:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3
接收数据: 2
接收数据: 3
安全的从Channel中取数据
先看一个例子
val channel: ReceiveChannel<Int> = produce {
(1..100).forEach{
send(it)
println("发送: $it")
}
}
while (!channel.isClosedForReceive){
val i = channel.receive();
println("接收: $i")
}
输出报错信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
可以看到使用isClosedForReceive判断是否关闭再使用receive方法接收数据,依然会报错,所以不推荐使用这种方式。
推荐使用上面for循环的方式取数据,还有kotlin推荐的consumeEach方式,看一下示例代码
val channel: ReceiveChannel<Int> = produce {
(1..100).forEach{
send(it)
println("发送: $it")
}
}
channel.consumeEach {
println("接收:$it")
}
所以,当我们想要获取Channel当中的数据时,我们尽量使用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。
“热的数据流”从何而来?
先看一下代码
println("开始")
val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
println("onUndeliveredElement = $it")
}
launch {
(1..3).forEach{
channel.send(it)
println("发送数据: $it")
}
}
println("结束")
}
输出:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3
可以看到上述代码中并没有 取channel中的数据,但是发送的代码正常执行了,这种“不管有没有接收方,发送方都会工作”的模式,就是我们将其认定为“热”的原因。
举个例子,就像去海底捞吃火锅一样,你不需要主动要求服务员加水,服务员看到你的杯子中水少了,会自动给你添加,你只管拿起水杯喝水就行了。
总的来说,不管接收方是否存在,Channel 的发送方一定会工作。
Channel能力的来源
通过源码可以看到Channel只是一个接口,它的能力来源于SendChannel和ReceiveChannel,一个发送管道,一个接收管道,相当于做了一个组合。
这也是一种良好的设计思想,“对读取开放,对写入封闭”的开闭原则。
来源:https://juejin.cn/post/7169078590640750599
猜你喜欢
- 前言J.U.C是java包java.util.concurrent的简写,中文简称并发包,是jdk1.5新增用来编写并发相关的基础api。j
- 本文实例为大家分享了java实现小超市程序的具体代码,供大家参考,具体内容如下一.人物包1.顾客类package person;public
- 游戏介绍:拼图游戏是一款经典的益智游戏,游戏难度分为 简单、正常、困难 三种难度,分别对应3*3,4*4,5*5布局,游戏开始前图片被随机打
- 本文主要介绍了关于单例模式的一些问题,想学习C#单例模式的同学们可以看一看,还是有些帮助c#中的单例模式单例模式是指在设计一个类时,保证在运
- 在java程序开发中,ftp用的比较多,经常打交道,比如说向FTP服务器上传文件、下载文件,本文给大家介绍如何利用jakarta commo
- 前言大家对AOP应该都不陌生, 就算没有用过也肯定听说过,切面编程一直是一个热点的话题,AOP即Aspect Oriented Progra
- 本文实例讲述了Android编程实现的首页左右滑动切换功能。分享给大家供大家参考,具体如下:很多软件会选择左右滑动的主界面,实现方式也很多,
- 结构体有时候我们仅需要一个小的数据结构,类提供的功能多于我们需要的功能;考虑到性能原因,最好使用结构体。结构体是值类型,存储在栈中或存储为内
- 前台form 表单:设置method=post,enctype=multipart/form-data。struts2在原有的上传解析器继承
- 本文实例讲述了C#编程实现动态改变配置文件信息的方法。分享给大家供大家参考,具体如下:配置文件实际上就是一个XML文件,所以我们可以使用Xm
- 参考文献:https://www.jb51.net/article/232858.htm使用springboot 2 构建项目,调试代码的时
- 下面将源代码贴出。 public static class ChineseToPinYin { private static readonl
- 题主要区分清楚内码(internal encoding)和外码(external encoding)就好了。内码是程序内部使用的字符编码,特
- 需要添加引用,System.Configuration;写系统配置文件: Configuration cfa =
- 这篇文章主要介绍了java获取当前时间的四种方法代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的
- 步骤,如图所示:1.添加异步任务业务类package top.ytheng.demo.task;import java.util.concu
- 题目给出一个 32 位的有符号整数,你需要将这个整数中每位上的数字进行反转。示例 1:输入: 123输出: 321 示例
- IO的本质IO的作用就是从外部系统读取数据到java程序中,或者把java程序中输出的数据写回到外部系统。这里的外部系统可能是磁盘,网络流等
- 本文实例为大家分享了java顺时针打印矩阵的具体代码,供大家参考,具体内容如下github:剑指offer编程题 import j
- 在 C# 中,结构体是值类型数据结构。它使得一个单一变量可以存储各种数据类型的相关数据。struct 关键字用于创建结构体。结构体是用来代表