博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava2.0(四)线程之间切换的内部原理
阅读量:6079 次
发布时间:2019-06-20

本文共 8394 字,大约阅读时间需要 27 分钟。

基本代码

来看一下基本代码:

Observable.create((ObservableOnSubscribe
) e -> { e.onNext(1); e.onNext(2); e.onComplete(); }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(i -> System.out.println("onNext : i= " + i));复制代码

很简单,即订阅时将task交给子线程去做,而数据的回调则在Android主线程中执行。

一、subscribeOn()

点击查看源码:

public final Observable
subscribeOn(Scheduler scheduler) { //非空判断和hook ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn
(this, scheduler)); }复制代码

实际上这个方法返回了一个ObservableSubscribeOn对象。我们有理由猜测这个ObservableSubscribeOn应该和上文的ObservableMap及ObservableDoOnEach相似,都是Observable的一个包装类(装饰器):

//1.ObservableSubscribeOn也是Observable的一个装饰器public final class ObservableSubscribeOn
extends AbstractObservableWithUpstream
{ final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource
source, Scheduler scheduler) { //2.存储上游的ObservableSource和调度器 super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer
s) { //3.new 一个SubscribeOnObserver final SubscribeOnObserver
parent = new SubscribeOnObserver
(s); //4.回调方法,这说明下游的onSubscribe回调方法所在线程和线程调度无关 // 是订阅时所在的线程 s.onSubscribe(parent); //5.立即执行线程调度 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }}复制代码

前两步我们不需要 再多解释,直接看第三点,我们看看SubscribeOnObserver这个类:

SubscribeOnObserver

static final class SubscribeOnObserver
extends AtomicReference
implements Observer
, Disposable { private static final long serialVersionUID = 8094547886072529208L; //下游的Observer final Observer
actual; //保存上游的Disposable,自身dispose时,连同上游一起dispose final AtomicReference
s; SubscribeOnObserver(Observer
actual) { this.actual = actual; this.s = new AtomicReference
(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); }复制代码

类似Observable和ObservableMap,SubscribeOnObserver同样是Disposable和Observer的一个装饰器,提供了对下游数据的传递,以及将task dispose的接口。

第4步我们之前就讲过了,直接看第5步:

//5.立即执行线程调度        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));复制代码

我们看看SubscribeTask这个类:

SubscribeTask

final class SubscribeTask implements Runnable {        private final SubscribeOnObserver
parent; SubscribeTask(SubscribeOnObserver
parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }复制代码

难以置信的简单,SubscribeTask 仅仅是一个Runnable 接口的实现类而已,通过将SubscribeOnObserver作为参数存起来,在run()方法中添加了上游Observable的被订阅事件,就没有了别的操作,

接下来我们看一下scheduler.scheduleDirect(SubscribeTask)中的代码:

public abstract class Scheduler {    //...    public Disposable scheduleDirect(@NonNull Runnable run) {        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);    }    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {        // Worker 本身就是Disposable 的实现类        // 请注意, createWorker()所创建的worker,        // 实际就是Schdulers.io()所提供的IoScheduler所创建的worker        final Worker w = createWorker();        //hook相关        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);        DisposeTask task = new DisposeTask(decoratedRun, w);        //即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 立即执行task        w.schedule(task, delay, unit);        return task;    }    //...}复制代码

我们不要追究过深,我们看一下这个createWorker方法的注释说明:

/**     * Retrieves or creates a new {
@link Scheduler.Worker} that represents serial execution of actions. * 检索或创建一个新的{
@link Scheduler.Worker}表示一系列的action * * When work is completed it should be unsubscribed using {
@link Scheduler.Worker#dispose()}. * 当work完成后,应使用{
@link Scheduler.Worker#dispose()}取消订阅。 * * Work on a {
@link Scheduler.Worker} is guaranteed to be sequential. * {
@link Scheduler.Worker} 上面的work保证是顺序执行的 */复制代码

现在我们知道了:我们通过调用subscribeOn()传入Scheduler,当下游ObservableSource被订阅时(请注意,订阅顺序是由下到上的),距离最近的线程调度subscribeOn()方法中,保存的Scheduler会创建一个worker(对应相应的线程,本文中为IoScheduler),在其对应的线程中,立即执行task

多次subscribeOn()

现在考虑一个问题,假如在我们的代码中,多次使用了subscribeOn()代码,到线程会怎么处理呢?

上文已经讲到了,不管我们怎么通过subscribeOn()方法切换线程,由于订阅执行顺序是由下到上,因此当最上游的ObservableSource被订阅时,所在线程当然是距离上游最近的subscribeOn()所提供的线程,即最终Observable总是在第一个subscribeOn()所在的线程中执行。

二、observeOn()

先看observeOn()内部,果然是hook+Observable的包装类:

public final Observable
observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } public final Observable
observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); //实例化ObservableObserveOn对象并返回 return RxJavaPlugins.onAssembly(new ObservableObserveOn
(this, scheduler, delayError, bufferSize)); }复制代码

再看ObservableObserveOn:

public final class ObservableObserveOn
extends AbstractObservableWithUpstream
{ final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource
source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); //1.相关依赖注入 this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer
observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //2.创建主线程的worker Scheduler.Worker w = scheduler.createWorker(); //3.上游数据源被订阅 source.subscribe(new ObserveOnObserver
(observer, w, delayError, bufferSize)); } }}复制代码

和subscribeOn()不同的是,我们并不是立即在对应的线程执行task,而是将对应的线程(实际上是worker)作为参数,实例化ObserveOnObserver并存储起来。

当上游的数据传递过来时,ObserveOnObserver执行对应的方法,比如onNext(T),再切换到对应线程中,并交由下游的Observer去接收:

ObserveOnObserver

ObserveOnObserver中代码极多,我们简单了解原理后,以onNext(T)为例:

static final class ObserveOnObserver
extends BasicIntQueueDisposable
implements Observer
, Runnable { //...省略其他代码 ObserveOnObserver(Observer
actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } //队列 SimpleQueue
queue; @Override public void onNext(T t) { if (done) { return; } //将数据存入队列 if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } //对应线程取出数据并交由下游的Observer schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } //...省略其他代码}复制代码

多次observerOn()

由上文得知,与subscribeOn()相反,observerOn()操作会将切换到对应的线程,然后交由下游的Observer处理,因此observerOn()仅对下游的Observer生效,并且,如果多次调用,observerOn()的线程调度会持续到下一个observerOn()操作之前。

总结

subscribeOn()

  • 订阅顺序当从下到上,上游的ObservableSource被订阅时,先切换线程,然后立即执行task;

  • 当存在多个subscribeOn()方法时,仅第一个subscribeOn()有效。

observerOn()

  • 订阅顺序当从下到上,上游的ObservableSource被订阅时,会将对应的worker创建并作为构造参数存储在Observer的装饰器中,并不会立即切换线程;

  • 当数据由上游发送过来时,先将数据存储到队列中,然后切换线程,然后在新的线程中将数据发送给下游的Observer;

  • 当存在多个observerOn()方法时,仅对距下游下一个observerOn()之前的observer有效

有兴趣可以关注我的小专栏,学习更多知识:

转载地址:http://ahhgx.baihongyu.com/

你可能感兴趣的文章
SQL Server 2014如何提升非在线的在线操作
查看>>
成为MySQL DBA博客-性能配置调优
查看>>
【java开发系列】—— spring简单入门示例
查看>>
无人驾驶,敢问路在何方?
查看>>
轻松实现日志可视化?— 95后阿里云 MVP 王鹏翰的答案
查看>>
Maven实战(六)--- dependencies与dependencyManagement的区别
查看>>
基于Ethereum & IPFS的去中心化Ebay区块链项目开发实战
查看>>
jQuery tmpl用法总结
查看>>
武汉seo:做网站标题优化设置教程的老生常谈问题
查看>>
FFmpeg实现监控摄像头的RTSP协议转RTMP协议直播
查看>>
两个不同概念?物联网是人工智能的基石
查看>>
反射程序集
查看>>
你所不知道的CSS滤镜技巧与细节
查看>>
duilib 修复 容器控件 rightbordersize和bottombordersize属性显示错误的bug
查看>>
RTP协议分析
查看>>
大数据在云计算中转换的4个步骤
查看>>
云安全:信息安全风险长尾的终结者
查看>>
阿里巴巴CTO王坚:电视不会垮掉
查看>>
【小程序】微信小程序开发实践
查看>>
OpenStack 实现技术分解 (7) 通用库 — oslo_config
查看>>