Kotlin select使用方法介绍
作者:且听真言 发布时间:2022-05-28 19:34:27
一、select是什么
select——>用于选择更快的结果。
基于场景理解
比如客户端要查询一个商品的详情。两个服务:缓存服务,速度快但信息可能是旧的;网络服务,速度慢但信息一定是最新的。
如何实现上述逻辑:
runBlocking {
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getNetworkInfo(productId: String): Product? {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val startTime = System.currentTimeMillis()
val productId = "001"
val cacheInfo = getCacheInfo(productId)
if (cacheInfo != null) {
updateUI(cacheInfo)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
val latestInfo = getNetworkInfo(productId)
if (latestInfo != null) {
updateUI(latestInfo)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
001 : 8.9
Time cost: 113
001 : 8.8
Time cost: 324
上述程序分为四步:第一步:查询缓存信息;第二步:缓存服务返回信息,更新 UI;第三步:查询网络服务;第四步:网络服务返回信息,更新 UI。
用户可以第一时间看到商品的信息,虽然它暂时会展示旧的信息,但由于我们同时查询了网络服务,旧缓存信息也马上会被替代成新的信息。但是可能存在一些问题:如果程序卡在了缓存服务,获取网络服务就会无法执行。是因为 getCacheInfo() 它是一个挂起函数,只有这个程序执行成功以后,才可以继续执行后面的任务。能否做到:两个挂起函数同时执行,谁返回的速度更快,就选择哪个结果。答案是使用select。
runBlocking {
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getNetworkInfo(productId: String): Product {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val startTime = System.currentTimeMillis()
val productId = "001"
val product = select<Product?> {
async {
getCacheInfo(productId)
}.onAwait {
it
}
async {
getNetworkInfo(productId)
}.onAwait {
it
}
}
if (product != null) {
updateUI(product)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
001 : 8.9
Time cost: 134
Process finished with exit code 0
由于缓存的服务更快,所以,select 确实帮我们选择了更快的那个结果。我们的 select 可以在缓存服务出现问题的时候,灵活选择网络服务的结果。从而避免用户等待太长的时间,得到糟糕的体验。
在上述代码中,用户大概率是会展示旧的缓存信息。但实际场景下,我们是需要进一步更新最新信息的。
runBlocking {
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getNetworkInfo(productId: String): Product {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val startTime = System.currentTimeMillis()
val productId = "001"
val cacheDeferred = async {
getCacheInfo(productId)
}
val latestDeferred = async {
getNetworkInfo(productId)
}
val product = select<Product?> {
cacheDeferred.onAwait {
it.copy(isCache = true)
}
latestDeferred.onAwait {
it.copy(isCache = false)
}
}
if (product != null) {
updateUI(product)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
if (product != null && product.isCache) {
val latest = latestDeferred.await() ?: return@runBlocking
updateUI(latest)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
001 : 8.9
Time cost: 124
001 : 8.8
Time cost: 228
Process finished with exit code 0
如果是多个服务的缓存场景呢?
runBlocking {
val startTime = System.currentTimeMillis()
val productId = "001"
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getCacheInfo2(productId: String): Product {
delay(50L)
return Product(productId, 8.85)
}
suspend fun getNetworkInfo(productId: String): Product {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val cacheDeferred = async {
getCacheInfo(productId)
}
val cacheDeferred2 = async {
getCacheInfo2(productId)
}
val latestDeferred = async {
getNetworkInfo(productId)
}
val product = select<Product?> {
cacheDeferred.onAwait {
it.copy(isCache = true)
}
cacheDeferred2.onAwait {
it.copy(isCache = true)
}
latestDeferred.onAwait {
it.copy(isCache = true)
}
}
if (product != null) {
updateUI(product)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
if (product != null && product.isCache) {
val latest = latestDeferred.await()
updateUI(latest)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
Log
001 : 8.85
Time cost: 79
001 : 8.8
Time cost: 229
Process finished with exit code 0
select 代码模式,可以提升程序的整体响应速度。
二、select和Channel
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce {
send(1)
delay(200L)
send(2)
delay(200L)
send(3)
}
val channel2 = produce {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
channel1.consumeEach {
println(it)
}
channel2.consumeEach {
println(it)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
1
2
3
a
b
c
Time cost: 853
Process finished with exit code 0
上述代码串行执行,可以使用select进行优化。
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce {
send(1)
delay(200L)
send(2)
delay(200L)
send(3)
}
val channel2 = produce {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<Int>,
channel2: ReceiveChannel<String>
): Any {
return select<Any> {
if (!channel1.isClosedForReceive) {
channel1.onReceive {
it.also {
println(it)
}
}
}
if (!channel2.isClosedForReceive) {
channel2.onReceive {
it.also {
println(it)
}
}
}
}
}
repeat(6) {
selectChannel(channel1, channel2)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
1
a
2
b
3
c
Time cost: 574
Process finished with exit code 0
从代码执行结果可以发现程序的执行耗时有效减少。onReceive{} 是 Channel 在 select 当中的语法,当 Channel 当中有数据以后,它就会被回调,通过这个 Lambda,将结果传出去。执行了 6 次 select,目的是要把两个管道中的所有数据都消耗掉。
如果Channel1不生产数据了,程序会如何执行?
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce<String> {
delay(5000L)
}
val channel2 = produce<String> {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<String>,
channel2: ReceiveChannel<String>
): String = select<String> {
channel1.onReceive {
it.also {
println(it)
}
}
channel2.onReceive {
it.also {
println(it)
}
}
}
repeat(3) {
selectChannel(channel1, channel2)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
a
b
c
Time cost: 570
Process finished with exit code 0
如果不知道Channel的个数,如何避免ClosedReceiveChannelException?
使用:onReceiveCatching{}
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce<String> {
delay(5000L)
}
val channel2 = produce<String> {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<String>,
channel2: ReceiveChannel<String>
): String = select<String> {
channel1.onReceiveCatching {
it.getOrNull() ?: "channel1 is closed!"
}
channel2.onReceiveCatching {
it.getOrNull() ?: "channel2 is closed!"
}
}
repeat(6) {
val result = selectChannel(channel1, channel2)
println(result)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 584
Process finished with exit code 0
得到所有结果以后,程序不会立即退出,因为 channel1 一直在 delay()。
所以我们需要在6次repeat之后将channel关闭。
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce<String> {
delay(15000L)
}
val channel2 = produce<String> {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<String>,
channel2: ReceiveChannel<String>
): String = select<String> {
channel1.onReceiveCatching {
it.getOrNull() ?: "channel1 is closed!"
}
channel2.onReceiveCatching {
it.getOrNull() ?: "channel2 is closed!"
}
}
repeat(6) {
val result = selectChannel(channel1, channel2)
println(result)
}
channel1.cancel()
channel2.cancel()
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 612
Process finished with exit code 0
Deferred、Channel 的 API:
public interface Deferred : CoroutineContext.Element {
public suspend fun join()
public suspend fun await(): T
public val onJoin: SelectClause0
public val onAwait: SelectClause1<T>
}
public interface SendChannel<in E>
public suspend fun send(element: E)
public val onSend: SelectClause2<E, SendChannel<E>>
}
public interface ReceiveChannel<out E> {
public suspend fun receive(): E
public suspend fun receiveCatching(): ChannelResult<E>
public val onReceive: SelectClause1<E>
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
}
当 select 与 Deferred 结合使用的时候,当并行的 Deferred 比较多的时候,你往往需要在得到一个最快的结果以后,去取消其他的 Deferred。
通过 async 并发执行协程,也可以借助 select 得到最快的结果。
runBlocking {
suspend fun <T> fastest(vararg deferreds: Deferred<T>): T = select {
fun cancelAll() = deferreds.forEach {
it.cancel()
}
for (deferred in deferreds) {
deferred.onAwait {
cancelAll()
it
}
}
}
val deferred1 = async {
delay(100L)
println("done1")
"result1"
}
val deferred2 = async {
delay(200L)
println("done2")
"result2"
}
val deferred3 = async {
delay(300L)
println("done3")
"result3"
}
val deferred4 = async {
delay(400L)
println("done4")
"result4"
}
val deferred5 = async {
delay(5000L)
println("done5")
"result5"
}
val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5)
println(fastest)
}
Log
done1
result1
Process finished with exit code 0
来源:https://blog.csdn.net/zhangying1994/article/details/127485681


猜你喜欢
- 在Java编程中,使用private关键字修饰了某个成员,只有这个成员所在的类和这个类的方法可以使用,其他的类都无法访问到这个private
- 一、HandlerThread的介绍及使用举例  
- 效果图一、绘制圆环圆环故名思意,第一个首先绘制是圆环1:圆环绘制函数圆环APIpublic void drawArc (RectF oval
- java导出Excel通用方法的实例详解Java导出Excel通用方法,只需要一个list 集合。通用方法改进之处踊跃提出package o
- 写在前面本文讲解的是如何使用Spring动态配置文件,实现不同环境不同配置,灵活切换配置文件;以及讲述了如何使用 Maven 打包,然后上传
- 背景在使用Spring Boot Mvc的项目中,使用Long类型作为id的类型,但是当前端使用Number类型接收Long类型数据时,由于
- 前言在一些APP中我们可以看到一些存放标签的容器控件,和我们平时使用的一些布局方式有些不同,它们一般都可以自动适应屏幕的宽度进行布局,根据对
- 我们开发一个Spring Boot项目,肯定要导入许多的静态资源,比如css,js等文件如果我们是一个web应用,我们的main下会有一个w
- 一、安装Maven下载地址:https://maven.apache.org/download.cgi把下载的安装包解压tar -xvf a
- 动态数组ArrayList类在System.Collecions的命名空间下,所以使用时要加入System.Collecions命名空间,而
- 今天学习了Mybatis执行存储,感觉不是那么好用,可能是我没用习惯。我先在SQLSERVER创建存储alter procedure usp
- 上次老师跟大家分享了 cookie、session和token,今天给大家分享一下Java 8中的Stream API。Stream简介1、
- 这篇文章主要介绍了Java import导入及访问控制权限修饰符过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考
- mybatis中的#和$的区别1. #将传入的数据都当成一个字符串,会对自动传入的数据加一个双引号。如:order by #user_id#
- 微服务启动时报错2021-05-18 21:25:44.644 WARN 5452 — [tbeatExecutor-0
- c#中的CultureInfo类CultureInfo类位于System.Globalization命名空间内,这个类和命名空间许多人都不是
- 本文实例讲述了C#实现Base64处理的加密解密,编码解码。分享给大家供大家参考,具体如下:using System;using Syste
- ASP.NET MVC中进行分页的方式有多种,但在NuGet上使用最广泛的就是用PagedList、X.PagedList.Mvc进行分页。
- 1.要求输入10个整数,从大到小排序输出输入:2 0 3 -4 8 9 5 1 7 6输出:9 8 7 6 5 3 2 1 0 -4解决方法
- 本文是一个 Spring 扩展支持 SPEL 的简单模式,方便第三方通过 Spring 提供额外功能。简化版方式这种方式可以在任何能获取Ap