Kotlin 协程异步热数据流的设计与使用讲解
作者:李萧蝶 发布时间:2022-09-21 18:30:32
一.异步冷数据流
在Kotlin协程:协程的基础与使用中,通过使用协程中提供的flow方法可以创建一个Flow对象。这种方法得到的Flow对象实际上是一个异步冷数据流,代码如下:
private suspend fun test() {
val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
GlobalScope.launch {
// 触发flow执行
flow.collect {
Log.d("liduo", "test1: $it")
}
}
GlobalScope.launch {
// 再次触发flow执行
flow.collect {
Log.d("liduo", "test2: $it")
}
}
}
在上面的代码中,通过调用flow方法,构建了一个名为flow对象,并对flow对象异步执行了两次。每次都会打印出1、2、3、4,然后结束执行。无论谁在前谁在后,无论执行多少次,得到的结果都是相同的,这就是异步冷数据流的一个特点。
二.异步热数据流
既然有冷数据流,那就一定有热数据流。在协程中提供了MutableSharedFlow方法来创建异步热数据流。相比于异步冷数据流,异步热数据流一般在类似广播订阅的场景中使用。
1.异步热数据流的设计
在异步热数据流中,核心接口的继承关系如下图所示:
1)SharedFlow接口
SharedFlow接口继承自Flow接口,代码如下:
public interface SharedFlow<out T> : Flow<T> {
// 用于保存最近的已经发送的数据
public val replayCache: List<T>
}
replay缓存:每个SharedFlow类型的对象会将最新发射的数据保存到replayCache中,每一个新的订阅者会先从replayCache中获取数据,然后再获取最新发射的数据。
订阅过程:在SharedFlow中,每个FlowCollecter类型的对象都被称为订阅者。调用SharedFlow类型对象的collect方法会触发订阅。正常情况下,订阅不会自动结束,但订阅者可以取消订阅,当订阅者所在的协程被取消时,订阅过程就会取消。
操作符使用:对于大部分终端操作符,比如:toList方法,当对SharedFlow类型的对象使用这些操作符将永远不会结束或完成变换(toList用于将上游发射的所有数据保存到列表中,并返回列表)。对于部分用于截断流的操作符,比如:take方法,当对SharedFlow类型的对象使用这些操作符可以完成变换(take用于截取指定数量的上游流发射的数据)。当对SharedFlow类型的对象使用flowOn操作符、cancellable操作符,或使用指定参数为RENDEZVOUS的buffer操作符是无效的。
SharedFlow并发: SharedFlow中所有的方法都是线程安全的,并且可以在多协程并发的场景中使用且不必额外加锁。
冷流转换热流:对于一个冷流,可以通过调用shareIn方法,转换为一个热流。
SharedFlow与BroadcastChannel的区别:从概念上讲,SharedFlow与BroadcastChannel很相似,但二者也有很大的差别,推荐使用SharedFlow,SharedFlow设计的目的就是要在未来替代BroadcastChannel:
SharedFlow更简单,不需要实现一堆与Channel相关的接口。
SharedFlow支持配置replay缓存与缓存溢出策略。
SharedFlow清楚地划分了只读的SharedFlow和可读可写的SharedFlow。
SharedFlow不能关闭,也不能表示失败,因此如果需要,所有的错误与完成信号都应该具体化。
2)MutableSharedFlow接口
MutableSharedFlow接口继承自SharedFlow接口与FlowCollector接口,并在此基础上定义了两个方法与一个常量,代码如下:
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
// 该方法用于尝试发射一个数据,
// 当返回true时表示发射成功,返回false时,表示缓存空间不足,需要挂起。
public fun tryEmit(value: T): Boolean
// 该常量表示当前SharedFlow的订阅者的数量,
// 该常量是一个状态流StateFlow,也是一个热流,当其中数值发生变化时会进行回调通知
public val subscriptionCount: StateFlow<Int>
// 用于清空replayCache
// 在调用该方法之前老的订阅者,可以继续收到replaycache中的缓存数据,
// 在调用该方法之后的新的订阅者,只能收到emit方法发射的新数据
@ExperimentalCoroutinesApi
public fun resetReplayCache()
}
2.异步热数据流的使用
1)MutableSharedFlow方法
在协程中,可以通过调用MutableSharedFlow方法创建一个MutableSharedFlow接口指向的对象,代码如下:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}
其中构造方法中三个参数的含义如下:
replay:表示新订阅的接收者可以收到的最近已经发射的数据的数量,默认为0。
extraBufferCapacity:表示除replay外,当发射速度大于接收速度时数据可缓存的数量,默认为0。
onBufferOverflow:表示当缓存已满,数据即将溢出时的数据的处理策略,默认为SUSPEND。
当创建MutableSharedFlow类型的对象时,可以通过参数replay确定SharedFlow接口中定义的replayCache的最大容量,通过参数extraBufferCapacity设置一个不包括replay大小的缓存数量。replayCache本质上也是缓存的一部分,因此extraBufferCapacity与replay共同决定了缓存的大小。
对于处理数据慢的订阅者,可以通过从缓存中获取数据,以此来避免发射者的挂起。缓存的数量大小决定了数据处理快的订阅者与数据处理慢的订阅者之间的延迟程度。
当使用默认的构造方法创建MutableSharedFlow类型的对象时,它的缓存数量为0。当调用它的emit方法时会直接挂起,直到所有的订阅者都处理完当前emit方法发送的数据,才会恢复emit方法的挂起。如果MutableSharedFlow类型的对象没有订阅者,则调用emit方法会直接返回。
2)使用示例
代码如下:
private suspend fun test() {
// 创建一个热流
val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND)
// 启动一个协程,发射数据:1
// 由于有缓存,因此会被添加到缓存中,不会挂起
GlobalScope.launch {
flow.emit(1)
}
// 将MutableSharedFlow对象转换为SharedFlow对象
// SharedFlow对象不能调用emit方法,因此只能用于接收
val onlyReadFlow = flow.asSharedFlow()
// 接收者1
// 启动一个新协程
GlobalScope.launch {
// 订阅监听,当collect方法触发订阅时,会首先会调onSubscription方法
onlyReadFlow.onSubscription {
Log.d("liduozuishuai", "test0: ")
// 发射数据:3
// 向下游发射数据:3,其他接收者收不到
emit(3)
}.onEach {
// 处理接收的数据
Log.d("liduozuishuai", "test1: $it")
}.collect()
}
// 接收者2
// 启动一个新的协程
GlobalScope.launch {
// 触发并处理接收的数据
onlyReadFlow.collect {
Log.d("liduozuishuai", "test2: $it")
}
}
// 发送数据:2
GlobalScope.launch {
flow.emit(2)
}
}
对于上面的代码,接收者1会依次打印出:3、1、2,接收者2会依次打印出1、2。
来源:https://juejin.cn/post/7144646576949395486


猜你喜欢
- 本文实例讲述了Spring的组合注解和元注解原理与用法。分享给大家供大家参考,具体如下:一 点睛从Spring 2开始,为了相应JDK 1.
- 本文实例讲述了C#获取文件创建时间的方法。分享给大家供大家参考。具体如下:C#获取文件创建时间,主要用到了FileInfo的Creattio
- Bezier Curve算法是根据参数曲线方程来得到光滑曲线的一种算法,曲线方程的参数由控制点决定。其本质是由调和函数根据控制点插值而成,其
- 前言Genymotion 来自于 AndroVM 这个开源项目,基于 x86 和 VirtualBox,支持 OpenGL 加速,可以用于
- 进程同步用来实现程序并发执行时候的可再现性。一.进程同步及异步的概念1.进程同步:就是在发出一个功能调用时,在没有得到结果之前,该调用就不返
- 我们在j2ee当中,连接数据库的时候经常会用到properties配置文件,我们原来在eclipse或者myeclipse当中会在src文件
- 本文实例为大家分享了QT实现简单计算器功能的具体代码,供大家参考,具体内容如下效果图:新建工程,创建类MainWindow,基类是QMain
- 首先来看看以下程序将会打印出什么:class Dog { public static void bark
- 这篇文章主要介绍了Java等待唤醒机制线程通信原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋
- 前言我又搞回笃NET啦!java之路真是命运多舛,好事多磨。不过,也许我已经进入无招胜有招,博取众家之长、融会贯通的地步了。对于WebApi
- 一、 lib文件的简介.lib是一种文件后缀,是Windows操作系统的库文件,有静态lib和动态lib之分:1)、静态lib文件
- 目录前言实现思路实现方法最简单的实现方法如果多线程乱入?线程安全的单例模式Lock版本静态构造器版本Lazy版本总结前言Singleton是
- 本文实例为大家分享了Java实现递归计算n的阶乘的具体代码,供大家参考,具体内容如下问题描述利用递归的思想实现阶乘的计算,以 n!为例(一)
- 本文实例为大家分享了Android实现注册界面的具体代码,供大家参考,具体内容如下LinearLayout 控制布局TextView 用于显
- 用servlet实现一个注册的小功能 ,后台获取数据。注册页面:注册页面代码 :<!DOCTYPE html><html&
- 一、串口连接的打开与关闭串口,即COM口,在.NET中使用 SerialPort 类进行操作。串口开启与关闭,是涉及慢速硬件的IO操作,频繁
- spring cloud gateway读取请求参数1. 我的版本:spring-cloud:Hoxton.RELEASEspring-bo
- 一、整合原理二、导包(41个)1.hibernate(1)hibernate/lib/required(2)hibernate/lib/jp
- 目录第一章 前言概述第01节 概述第02节 区别第二章 核心代码第01节 成员变量第02节 构造方法第三章 扩容操作第01节 扩容代码第一章
- 鉴于谷歌最新推出的Android Studio备受开发者的推崇,所以也跟着体验一下。一、介绍Android Studio Andr