Reactor 多任务并发执行且结果按顺序返回第一个
作者:???????六七十三 发布时间:2021-08-15 03:32:46
1 场景
调用多个平级服务,按照服务优先级返回第一个有效数据。
具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?
2 创建 service
2.1 创建基本接口和实体类
public interface TestServiceI {
Mono request();
}
提供一个 request 方法,返回一个 Mono 对象。
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TestUser {
private String name;
}
2.2 创建 service 实现
@Slf4j
public class TestServiceImpl1 implements TestServiceI {
@Override
public Mono request() {
log.info("execute.test.service1");
return Mono.fromSupplier(() -> {
try {
System.out.println("service1.threadName=" + Thread.currentThread().getName());
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "";
})
.map(name -> {
return new TestUser(name);
});
}
}
第一个 service 执行耗时 500ms。返回空对象;
创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。
继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。
3 主体方法
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
TestServiceI testServiceImpl4 = new TestServiceImpl4();
TestServiceI testServiceImpl5 = new TestServiceImpl5();
TestServiceI testServiceImpl6 = new TestServiceImpl6();
List<TestServiceI> serviceIList = new ArrayList<>();
serviceIList.add(testServiceImpl4);
serviceIList.add(testServiceImpl5);
serviceIList.add(testServiceImpl6);
// 执行 service 列表,这样有多少个 service 都可以
Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
.map(service -> {
return service.request();
});
// flatMap(或者flatMapSequential) + map 实现异常继续下一个执行
Flux flux = monoFlux.flatMapSequential(mono -> {
return mono.map(user -> {
TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
return testUser;
}
// null 在 reactor 中是异常数据。
return null;
})
.onErrorContinue((err, i) -> {
log.info("onErrorContinue={}", i);
});
});
Mono mono = flux.elementAt(0, Mono.just(""));
Object block = mono.block();
System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime));
}
1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。
2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:Reactor中的onErrorContinue 和 onErrorResume
3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个正常数据。
执行输出:
20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 执行耗时ms:2895
1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。
2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。
总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?
4 实现异步
4.1 subcribeOn 实现异步
修改 service 实现。增加 .subscribeOn(Schedulers.boundedElastic())
如下:
@Slf4j
public class TestServiceImpl1 implements TestServiceI {
@Override
public Mono request() {
log.info("execute.test.service1");
return Mono.fromSupplier(() -> {
try {
System.out.println("service1.threadName=" + Thread.currentThread().getName());
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "";
})
//增加subscribeOn
.subscribeOn(Schedulers.boundedElastic())
.map(name -> {
return new TestUser(name);
});
}
}
再次执行输出如下:
21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 执行耗时ms:1242
1、发现具体实现 sleep 的线程都不是 main 线程,而是
boundedElastic
;2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。
4.2 CompletableFuture 实现异步
修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。
@Slf4j
public class TestServiceImpl1 implements TestServiceI{
@Override
public Mono request() {
log.info("execute.test.service1");
CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("service1.threadName=" + Thread.currentThread().getName());
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "testname1";
});
return Mono.fromFuture(uCompletableFuture).map(name -> {
return new TestUser(name);
});
}
}
执行返回如下:
21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 执行耗时ms:1238
1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。
2、最终耗时和方法1基本差不多。
来源:https://segmentfault.com/a/1190000042368568


猜你喜欢
- 在开发 Web 项目的时候,经常需要过滤器来处理一些请求,包括字符集转换什么的,记录请求日志什么的等等。在之前的 Web 开发中,我们习惯把
- 在日常工作中,我们往往只关注 Java 内存使用情况,这主要是因为 Java 内存分析相关的工具比较多。与之不同的是,图片内存分析的工具比较
- 本文所需的数据库初始文件,Hibernate常用操作的完整示例代码(包含所有Hibernate操作所需jar文件)提供下载学习:http:/
- Spring boot 2.1.X整合Elasticsearch最新版的一处问题新版本的Spring boot 2的spring-boot-
- 在application.xml加上以下配置mybatis-plus.configuration.map-underscore-to-cam
- 一、概述近期注意到QQ新版使用了沉浸式状态栏,ok,先声明一下效果图:恩,接下来正题。首先只有大于等于4.4版本支持这个半透明状态栏的效果,
- using System;using System.Collections.Generic;using System.Linq; using
- WebService是一个SOA(面向服务的编程)的架构,它是不依赖于语言,不依赖于平台,可以实现不同的语言间的相互调用,通过Interne
- 微信是现在比较流行的应用了,在各大安卓市场几乎都是名列前茅了。说实话不得不羡慕腾讯庞大的用户群体,只要腾讯敢做,就会有很多人去用。废话不多说
- 一、获取程序集版本 程序代码 label版本.Text = System.Reflection.Assembly.GetExecutingA
- public static Expression<Func<T, bool>> GetSearchExpressio
- 上一节我们了解了Lock接口的一些简单的说明,知道Lock锁的常用形式,那么这节我们正式开始进入JUC锁(java.util.concurr
- Java 回调函数概要: 所谓回调,就是客户程序C调用服务程序S中的某个函数A,然后S又在某个时候反过来调用C中的某个
- 创建SpringBoot项目可以通过两种方式1、通过访问:https://start.spring.io/,SpringBoot的官方网站进
- 开发一个需要常住后台的App其实是一件非常头疼的事情,不仅要应对国内各大厂商的ROM,还需要应对各类的安全管家...虽然不断的研究各式各样的
- java中 String和StringBuffer的区别实例详解String: &
- 首先,定义TabHost的布局文件:<?xml version="1.0" encoding="utf-
- 当一个Java开发人员加入到Groovy的开发之旅的时候,他/她经常带着Java思想去思考,并逐步地学习Groovy,每次学习一个特性,这会
- 前言定时/计划功能在Java应用的各个领域都使用得非常多,比方说Web层面,可能一个项目要定时采集话单、定时更新某些缓存、定时清理一批不活跃
- 本文实例讲述了C#操作ftp类。分享给大家供大家参考。具体如下:using System;using System.Collections.