RxJava中多种场景的实现总结
作者:lluo2010 发布时间:2023-01-09 05:39:02
一、推迟执行动作
可以使用timer+map方法实现.代码如下:
Observable.timer(5, TimeUnit.MILLISECONDS).map(value->{
return doSomething();
}).subscribe(System.out::println);
}
二、推迟发送执行的结果
这种场景要求产生数据的动作是马上执行,但是结果推迟发送.这和上面场景的是不一样的.
这种场景可以使用Observable.zip
来实现.
zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。
对于各个observable相同位置的数据,需要相互等待,也就说,第一个observable第一个位置的数据产生后,要等待第二个observable第一个位置的数据产生,等各个Observable相同位置的数据都产生后,才能按指定规则进行组合.这真是我们要利用的.
zip有很多种声明,但大致上是一样的,就是传入几个observable,然后指定一个规则,对每个observable对应位置的数据进行处理,产生一个新的数据, 下面是其中一个最简单的:
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, final Func2<? super T1, ? super T2, ? extends R> zipFunction);
用zip实现推送发送执行结果如下:
Observable.zip(Observable.timer(5,TimeUnit.MILLISECONDS)
,Observable.just(doSomething()), (x,y)->y)
.subscribe(System.out::println));
三、使用defer在指定线程里执行某种动作
如下面的代码,虽然我们指定了线程的运行方式,但是doSomething()
这个函数还是在当前代码调用的线程中执行的.
Observable.just(doSomething())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->Utils.printlnWithThread(v.toString()););
通常我们采用下面的方法达到目的:
Observable.create(s->{s.onNext(doSomething());})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{
Utils.printlnWithThread(v.toString());
});
但其实我们采用defer也能达到相同的目的.
关于defer
defer 操作符与create、just、from等操作符一样,是创建类操作符,不过所有与该操作符相关的数据都是在订阅是才生效的。
声明:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory);
defer的Func0里的Observable是在订阅(subscribe)的时候才创建的.
作用:
Do not create the Observable until an Observer subscribes; create a fresh Observable on each subscription.
也就说observable是在订阅的时候才创建的.
上面的问题用defer实现:
Observable.defer(()->Observable.just(doSomething()))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{Utils.printlnWithThread(v.toString());
});
四、使用compose不要打断链式结构
我们经常看到下面的代码:
Observable.just(doSomething())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(v->{Utils.printlnWithThread(v.toString());
上面的代码中,subscribeOn(xxx).observeOn(xxx)
可能在很多地方都是一样的, 如果我们打算把它统一在某一个地方实现, 我们可以这么写:
private static <T> Observable<T> applySchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
但是这样每次我们需要调用上面的方法, 大致会像下面这样,最外面是一个函数,等于打破了链接结构:
applySchedulers(Observable.from(someSource).map(new Func1<Data, Data>() {
@Override public Data call(Data data) {
return manipulate(data);
}
})
).subscribe(new Action1<Data>() {
@Override public void call(Data data) {
doSomething(data);
}
});
可以使用compose操作符达到不打破链接结构的目的.
compose的申明如下:
public Observable compose(Transformer<? super T, ? extends R> transformer);
它的入参是一个Transformer接口,输出是一个Observable. 而Transformer实际上就是一个Func1<Observable<T>
, Observable<R>>
,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable.
简单的说,compose可以通过指定的转化方式(输入参数transformer),将原来的observable转化为另外一种Observable.
通过compose, 采用下面方式指定线程方式:
private static <T> Transformer<T, T> applySchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
};
}
Observable.just(doSomething()).compose(applySchedulers())
.subscribe(v->{Utils.printlnWithThread(v.toString());
});
函数applySchedulers可以使用lambda表达式进一步简化为下面为:
private static <T> Transformer<T, T> applySchedulers() {
return observable->observable.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation());
}
五、按优先级使用不同的执行结果
上面这个标题估计没表达清楚我想表达的场景. 其实我想表达的场景类似于平常的获取网络数据场景:如果缓存有,从缓存获取,如果没有,再从网络获取.
这里要求,如果缓存有,不会做从网络获取数据的动作.
这个可以采用concat+first实现.
concat将几个Observable合并成一个Observable,返回最终的一个Observable. 而那些数据就像从一个Observable发出来一样. 参数可以是多个Observable,也可以是包含Observalbe的Iterator.
新的observable内的数据排列按原来concat里的observable顺序排列,即新结果内的数据是按原来的顺序排序的.
下面是上述需求的实现:
Observable.concat(getDataFromCache(),getDataFromNetwork()).first()
.subscribe(v->System.out.println("result:"+v));
//从缓存获取数据
private static Observable<String> getDataFromCache(){
return Observable.create(s -> {
//dosomething to get data
int value = new Random().nextInt();
value = value%2;
if (value!=0){
s.onNext("data from cache:"+value); //产生数据
}
//s.onError(new Throwable("none"));
s.onCompleted();
}
);
}
//从网络获取数据
private static Observable<String> getDataFromNetwork(){
return Observable.create(s -> {
for (int i = 0; i < 10; i++) {
Utils.println("obs2 generate "+i);
s.onNext("data from network:" + i); //产生数据
}
s.onCompleted();
}
);
}
上面的实现,如果getDataFromCache有数据, getDataFromNetwork这里的代码是不会执行的, 这正是我们想要的.
上面实现有几个需要注意:
1、有可能从两个地方都获取不到数据, 这种场景下使用first会抛出异常NoSuchElementException,如果是这样的场景,需要用firstOrDefault替换上面的first.
2、上面getDataFromCache()
里,如果没有数据,我们直接调用onCompleted,如果不调用onCompleted,而是调用onError,则上述采用concat是得不到任何结果的.因为concat在收到任何一个error,合并就会停止.所以,如果要用onError, 则需要用concatDelayError替代concat.concatDelayError
会先忽略error,将error推迟到最后在处理.
总结


猜你喜欢
- 方法● maxBy:获取流中最大元素;minBy:获取流中最小元素● joining:合并,将流中的元素,以字符串的形式拼接起来● summ
- 本文为大家分享两个实例,相信大家一定会喜欢。实例1:随机生成验证码图片并将之输出为一个png文件效果图:import java.awt.Co
- 本文实例讲述了C#执行SQL事务用法。分享给大家供大家参考。具体分析如下:1.通过存储过程。2.通过C#中提供的Transaction。这里
- 前言上文讲的MyBatis部署运行且根据官网运行了一个demo:一步到位部署运行MyBatis3源码<保姆级>jdbc再贴一个J
- 一. ANR场景无论是四大组件或者进程等只要发生ANR,最终都会调用AMS.appNotResponding()方法,下面从这个方法说起。以
- 目录存储权限内部存储 外部存储适配存储权限Android Q 仍然使用 READ_EXTRNAL_STORAGE 和 WRITE_EXTRN
- Mybatis防止sql注入原理SQL 注入是一种代码注入技术,用于攻击数据驱动的应用,恶意的SQL 语句 * 入到执行的实体字段中(例如,为
- settings.xml有什么用?如果在Eclipse中使用过Maven插件,想必会有这个经验:配置settings.xml文件的路径。se
- 系统自带的VideoView有些视频格式不支持,那么我们可以用第三方实现的VideoView替代系统的来播放视频,比较流行的有ijkplay
- 提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档前言这两天在项目中使用到Java的导入导出功能,以前对这块有一定了解,但是没
- 目的官方的Drools范例大都是基于纯Java项目或Maven项目,而基于Spring Boot项目的很少。本文介绍如何在Spring Bo
- MyBatis使⽤PageHelper1.limit分⻚(1)概念:①页码:pageNum(用户会发送请求,携带页码pageNum给服务器)
- 前言一般生成的PDF文档默认的文档底色为白色,我们可以通过一定方法来更改文档的背景色,以达到文档美化以及保护双眼的作用。 以下内容提供了Ja
- 一维数组1.一维数组的定义方式:int[] array1 = new int[3];//声明创建一个包含3个元素的数组array1(初始值为
- 最近经常有人问Spring Cloud Feign如何上传文件。有团队的新成员,也有其他公司的兄弟。本文简单做个总结——早期的Spring
- 1、对属性进行封装,使用户不能直接输入数据,我们需要避免用户再使用"对象.属性"的方式对属性进行赋值。则需要将属性声明为
- 前些天在写个小程序,用到DataGridView,想给它动态的显示行号。不是很费劲GOOGLE了一下,这GOOGLE不要紧,发现了不少问题。
- 今天研究了下RecyclerView的滑动事件,特别是下拉刷新和加载更多事件,在现在几乎所有的APP显示数据列表时都用到了。自定义Recyc
- 开发设计搞了一个带圆形进度的进度条,在GitHub上逛了一圈,发现没有,自己撸吧。先看界面效果:主要思路是写一个继承ProgressBar的
- 测试APP时出现以下错误信息:Intel HAXM is required to run this AVD.Your CPU does no