本次分析的 RxJava 版本信息如下:
1 | implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' |
先来个简单例子:
1 | // 创建被观察者 |
以下的分析都基于这个例子,分析过程会分为三部分:
- subscribe() 流程
- subscribeOn() 流程
- observeOn() 流程
subscribe()
把 subscribe() 放在前面是因为后续的 subscribeOn() 和 observeOn() 流程都它的支撑,但是单纯的分析 subscribe() 流程没多大意义,所以这个流程基于 observable.subscribe(observer)
:
create()
首先是被观察者的创建过程,即 Observable.create(new ObservableOnSubscribe<Integer>() {...})
。ObservableOnSubscribe 是个接口,里面只有一个 subscribe() 方法,所以重点在 create() 方法:
1 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
看来 ObservableCreate 才是目标:
1 | public final class ObservableCreate<T> extends Observable<T> { |
很简单,只有两个方法,并且把 ObservableOnSubscribe 对象缓存为 source。
到这里 create() 方法算是执行完了,一句话总结:create() 方法实际上创建了一个 ObservableCreate 对象,并且持有了 ObservableOnSubscribe 对象的引用。
subscribe()
现在看看订阅方法 subscribe(),它在 Observable 抽象类中:
1 | public final void subscribe(Observer<? super T> observer) { |
subscribe() 主要就是调用了抽象方法 subscribeActual(),根据刚刚在 create() 方法的分析,这里调用到了 ObservableCreate 的 subscribeActual() 方法:
1 |
|
先是通过 observer 创建出一个 CreateEmitter 对象,CreateEmitter 是 ObservableCreate 其中一个内部类,主要功能就是对 Observer 的四个方法(onSubscribe()、onNext()、onError()、onComplete())进行了包装,并且提供了 dispose 系列方法:
1 | static final class CreateEmitter<T> extends AtomicReference<Disposable> |
回到 subscribeActual() 方法,然后执行了 onSubscribe() 方法,并把 CreateEmitter 对象引用传递出去,这时外部的 onSubscribe() 就得到了执行。
接下来执行 source.subscribe(parent)
,这个 source 就是 Observable.create() 方法传递的参数,所以这时就到了发送事件的地方。
调用 e.onNext()、e.onComplete()、e.onError()
其实都是调用了 CreateEmitter 中对应的方法,根据上面提供的 CreateEmitter 类源码可知最终调用的都是创建 Observer 时实现的方法。
observable.subscribe(observer)
流程算是完了,来个图总结下这个流程:
subscribeOn()
subscribeOn() 指定 Observable 在哪个调度器上执行,以下流程基于 subscribeOn(Schedulers.io()).subscribe()
:
Schedulers.io()
Schedulers 的源码并不多,这里只留下和 Schedulers.io()
相关的代码:
1 | public final class Schedulers { |
把 Schedulers.io()
相关源码都放在一起了,这样更清晰。
所以 Schedulers.io()
就是创建了一个 IoScheduler 对象
subscribeOn()
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
subscribeOn() 方法主要是创建了一个 ObservableSubscribeOn 对象,需要两个参数:
- this:根据第一部分 subscribe() 流程分析,此处是 ObservableCreate 对象
- scheduler:基于
Schedulers.io()
,所以此处是 IoScheduler 对象
ObservableSubscribeOn 类和 ObservableCreate 很相似,都是只有两个方法:
1 | public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { |
调用了父类构造方法缓存 source ,还缓存了 scheduler。
小结:subscribeOn(Schedulers.io())
过程就是创建了一个 ObservableSubscribeOn 对象,并且缓存了 ObservableCreate 和 IoScheduler 两个对象。
subscribe()
在 subscribe() 流程部分已经分析过 subscribe() 的内部执行逻辑了,但是这次的调用对象变成了 ObservableSubscribeOn,所以变成了调用 ObservableSubscribeOn 的 subscribeActual() 方法。
这次单独把 subscribeActual() 方法拿出来:
1 |
|
首先把 Observer 对象包装成 SubscribeOnObserver, SubscribeOnObserver 类是 ObservableSubscribeOn 的其中一个内部类:
1 | static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { |
回到 subscribeActual() 方法,执行 s.onSubscribe(parent)
也就是调用了 Observer 的 onSubscribe() 方法,到现在还没有涉及到线程的东西,所以 onSubscribe() 方法是在主线程回调的。
再看 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
,这里嵌套了 3 层,一层一层来:
new SubscribeTask(parent)
SubscribeTask 实现了 Runnable 接口,是 ObservableSubscribeOn 另一个内部类:
1 | final class SubscribeTask implements Runnable { |
SubscribeTask 目前缓存了 Observer 的包装类 SubscribeOnObserver。
scheduler.scheduleDirect(new SubscribeTask(parent))
先调用 Scheduler 的 scheduleDirect() 方法:
1 | public Disposable scheduleDirect(@NonNull Runnable run) { |
createWorker() 是个抽象方法,根据前面的分析,此处调用了 IoScheduler 的 createWorker() 方法,而 createWorker() 方法只是返回了一个 IoScheduler 的内部类 EventLoopWorker 对象。
DisposeTask 实现了 Runnable 和 Disposable 两个接口,并且在 run() 方法中调用了 decoratedRun.run()
。
所以 w.schedule(task, delay, unit)
才是一切的开始,别忘了此处 w 是 IoScheduler 的内部类 EventLoopWorker 对象。EventLoopWorker 的 schedule() 方法又调用了 ThreadWorker 的 scheduleActual() 方法,而 ThreadWorker 是继承自 NewThreadWorker 的。
在 NewThreadWorker 中创建了一个线程池,并且缓存为 executor:
1 | executor = SchedulerPoolFactory.create(threadFactory); |
所以接下来看看 NewThreadWorker 的 scheduleActual() 方法:
1 | public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { |
在上述代码第 16 行,把 sr 放到了线程池中执行,所以从这里开始就是在子线程中执行的操作。这个 sr 就是在第 5 行声明的 ScheduledRunnable 对象,ScheduledRunnable 实现了 Runnable 和 Callable 接口,所以根据线程池的尿性,最后肯定是执行 ScheduledRunnable 的 call() 方法:
1 |
|
此处的 actual 就是前面传递过来的 DisposeTask 对象,在 DisposeTask 的 run() 方法又调用了 decoratedRun.run()
,而 decoratedRun 又是个 SubscribeTask 对象,所以又到了 SubscribeTask 的 run() 方法执行 source.subscribe(parent)
。
由于 SubscribeTask 是 ObservableSubscribeOn 的内部类,所以此处的 source 是个 ObservableCreate 对象,这样就回到了第一部分:subscribe() 流程,但是不一样的是这次的执行都是在线程池中执行的。
subscribeOn(Schedulers.io()).subscribe()
流程也执行完了,上个图压压惊:
observeOn()
observeOn() 指定一个观察者在哪个调度器上观察这个 Observable,以下流程基于 observable.observeOn(AndroidSchedulers.mainThread()).subscribe()
:
AndroidSchedulers.mainThread()
AndroidSchedulers 的源码也不多,把 AndroidSchedulers 中涉及到 RxJavaPlugins 的两个方法也放在了一起,清晰:
1 | public final class AndroidSchedulers { |
所以 AndroidSchedulers.mainThread()
就是创建了一个 HandlerScheduler 对象,这个 HandlerScheduler 里面缓存了一个用 MainLooper 构造的 Handler 对象。
observeOn()
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
observeOn() 方法主要是创建了一个 ObservableObserveOn 对象,传入了四个参数,目前只需关注前面两个即可:
- this:根据第一部分 subscribe() 流程分析,此处是 ObservableCreate 对象
- scheduler:基于
AndroidSchedulers.mainThread()
,所以此处是 HandlerScheduler 对象
ObservableObserveOn 类也比较简单,调用了父类构造方法缓存 source ,还缓存了 scheduler:
1 | public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { |
小结:observeOn(AndroidSchedulers.mainThread())
过程就是创建了一个 ObservableObserveOn 对象,并且缓存了 ObservableCreate 和 HandlerScheduler 两个对象。
subscribe()
在 subscribe() 流程部分和 subscribeOn() 流程部分都已经分析过 subscribe() 的内部执行逻辑了,但是这次的调用对象变成了 ObservableObserveOn,所以变成了调用 ObservableObserveOn 的 subscribeActual() 方法。
这次单独把 subscribeActual() 方法拿出来:
1 |
|
直接看 else 部分,createWorker() 是个抽象方法,根据前面的分析,此处调用了 HandlerScheduler 的 createWorker() 方法,只是返回了一个 HandlerScheduler 的内部类 HandlerWorker 对象。
ObserveOnObserver 是 ObservableObserveOn 的内部类,实现了 Observer 和 Runnable 接口,这里只看一下它的构造方法:
1 | ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { |
接着看 source.subscribe()
,此处的 source 是个 ObservableCreate 对象,所以再来看看 ObservableCreate 的 subscribe() 方法:
1 |
|
这个方法其实前面已经了解过了,但是由于调用对象的不同导致里面的执行逻辑也不同。
这里的 observer 是经过前面封装的 ObserveOnObserver 对象,CreateEmitter 和前面是一样的,主要功能就是对 Observer 的四个方法(onSubscribe()、onNext()、onError()、onComplete())进行了包装,并且提供了 dispose 系列方法。
执行 observer.onSubscribe(parent)
就去到了 ObserveOnObserver 的 onSubscribe() 方法:
1 |
|
里面都会执行到 actual.onSubscribe(this)
,根据前面提供的 ObserveOnObserver 构造方法可知这个 actual 就是外部传递的 observer,所以此时外部的 onSubscribe() 方法被回调。
回到 subscribeActual() 方法看 source.subscribe(parent)
,这个 source 就是 Observable.create() 方法传递的参数,所以这时就到了发送事件的地方。
调用 e.onNext()、e.onComplete()、e.onError()
其实都是调用了 CreateEmitter 中对应的方法,根据上面分析可知最终调用的都是 ObserveOnObserver 中对应的方法:
1 | public void onNext(T t) { |
在 ObserveOnObserver 中对应的方法又都调用了 schedule() 方法,然后在调用了 worker.schedule(this)
。
还记得这个 worker 是谁么?是个 HandlerScheduler 对象,所以看一下它的 schedule() 方法:
1 |
|
ScheduledRunnable 是 HandlerScheduler 的内部类,实现了 Runnable 和 Disposable 接口。
下面就很熟悉了,通过 Handler 来发送消息,这个 Handler 是 AndroidSchedulers.mainThread()
中构建的,所以是运行在主线程的。
由于构建 Message 传递了 ScheduledRunnable 对象,所以最后回到了 ScheduledRunnable 的 run() 方法,这样就切换到了主线程:
1 |
|
这个 delegate 其实就是创建 ScheduledRunnable 传递进来的 run,也就是 ObserveOnObserver(别忘了 ObserveOnObserver 实现了 Runnable 接口),所以就跳转到了 ObserveOnObserver 的 run() 方法:
1 |
|
drainFused()
和 drainNormal()
里面就是关于 onNext()、onComplete()、onError()
的回调。
到这里 observable.observeOn(AndroidSchedulers.mainThread()).subscribe()
流程也执行完了,再来个图: