RxJava 触发流基本原理源码解析
作者:itbird01 发布时间:2023-06-24 06:02:57
本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原理。
为了本文原理分析环节,可以被更多的人理解、学习,所以小编从初学者的角度,从使用入手,一点点的分析了其中的源码细节、思想,建议大家随着本文的章节步骤,一步一步的来阅读,才能更快、更好的理解Rxjava的真正的思想精髓,也为我们之后的实践课程留一个好的底子。
触发流
到目前为止,我们讲了构建流、订阅流,但是依然没有触发真正的observer中的事件,例如:
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext s = " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
各位看官,莫急莫急,且听老衲娓娓道来。
还记得上面的订阅流吗?订阅流从右往左执行的,执行到最后的observable,执行了它的subscribe方法。我们从使用代码知道,最左端的observable是啥来着,大家还记得吗?当然是ObservableJust
private void test() {
//第一步:just调用
Observable.just("https://img-blog.csdn.net/20160903083319668")
//第二步:map调用
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
//Bitmap bitmap = downloadImage(s);
return null;
}
})
//第三步:subscribeOn、observeOn调用
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
//第四步:subscribe调用
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe() {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Bitmap s) {
Log.d(TAG, "onNext s = " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
我们就顺坡下驴,看一下ObservableJust的subscribe方法做啥了
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
仔细一看,这里面没有subscribe方法,那么肯定就是调用父类observable的subscribe方法了
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//对象封装,暂时不是重点,我们跳过
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
大家看到这里,其实关键在于,最终调用了一个subscribeActual方法,所以我们继续看子类ObservableJust的subscribeActual方法干啥了?
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
接续根据ScalarDisposable的run方法
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
private static final long serialVersionUID = 3880992722410194083L;
final Observer<? super T> observer;
final T value;
//...省略很多代码
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
//可以看到这里执行了onNext、onComplete方法
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
小结
看到这里,我们知道了,开始一层一层的从左往右去调用observer的相关方法了。 由订阅流可知,每层的observable实际上拥有下一层的observer的代理类,所以自然而然,从最左边开始调用observer的相关方法开始,触发流,就是从左往右,一层一层的剥开之前包裹的observer,然后顺序调用里面的onNext、onComplete等方法。 不信,我们挑一个ObservableMap来验证一下。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//此处调用了下游的observer的onNext方法
downstream.onNext(v);
}
}
}
可以看到里面,的确调用了下游的observer的onNext方法。
来源:https://juejin.cn/post/7182841957583224890


猜你喜欢
- 一 前言学习微服务要从基础的架构学起,首先你要有个微服务的概念才能学习对吧!!如果你都不知道啥是微服务,就一头扎进去学习,你自己也觉得自己也
- 本文实例讲述了Java基于栈方式解决汉诺塔问题。分享给大家供大家参考,具体如下:/** * 栈方式非递归汉诺塔 * @author zy *
- 本文实例讲述了C#基于委托实现多线程之间操作的方法。分享给大家供大家参考,具体如下:有的时候我们要起多个线程,更多的时候可能会有某个线程会去
- 本文实例讲述了Android中使用Service实现后台发送邮件功能。分享给大家供大家参考,具体如下:程序如下:import android
- 不知道大家对千篇一律的404 Not Found的错误页面是否感到腻歪了?其实通过很简单的配置就能够让Spring MVC显示您自定义的40
- Mybatis表现关联关系比hibernate简单,没有分那么细致one-to-many、many-to-one、one-to-one。而是
- 什么是Spring BootSpring Boot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring 应用的初
- 2017年一直以来在公司负责爬虫项目相关工程,主要业务有预定、库存、在开发中也遇到很多问题,随手记录一下,后续会持续更新。chrome、fi
- 前言看 WMS 代码的时候看到了 Handler.runWithScissors 方法,所以来恶补一下public static Windo
- 在编写程序,我们经常会对一些时间进行比较,比如要搜寻一个时间范围中的数据,需要用户输入开始时间和结束时间,如果结束时间小于或等于开始时间,那
- 模型对象的作用主要是保存数据,可以借助它们将数据带到前端。常用的模型对象有以下几个:ModelAndView(顾名思义,模型和视图,既可以携
- 今天用scheduled写定时任务的时候发现定时任务一秒重复执行一次,而我的cron表达式为 * 0/2 * * * * 。在源码调试的过程
- C# WinForm控件的拖动和缩放是个很有用的功能。实现起来其实很简单的,主要是设计控件的MouseDown、MouseLeave、Mou
- jackson反序列化忽略字段JSON字符串中含有我们并不需要的字段,那么当对应的实体类中不含有该字段时,会抛出一个异常,告诉你有些字段没有
- 原来的测试类的注解:@RunWith(SpringRunner.class)@SpringBootTest一直没法自动注入,后来在@Spri
- 一维数组1.一维数组的定义方式:int[] array1 = new int[3];//声明创建一个包含3个元素的数组array1(初始值为
- 本文实例讲述了Android开发中MotionEvent坐标获取方法。分享给大家供大家参考,具体如下:Android MotionEvent
- 使用的是 idea - Lifecycle-package 的方式打包(maven)确认 <packaging>wa
- 最近项目中经常需要用到自定义控件,因此自定义属性也是经常要用到的,在此说明一下自定义属性的用法:自定义属性都存在于/value/attr.x
- @Value值注入及配置文件组件扫描spring配置文件对应的是父容器,springMVC配置文件产生的是子容器,前者一般配置数据源,事务,