RxJava原理之Scheduler

= 1646

本文出自:【InTheWorld的博客】

线程的切换是RxJava一个重要的特性,并行运算和指定线程运行都是基于这种特性的。 其中,RxJava使用Scheduler类来抽象执行体(线程或者线程池)。而指定Scheduler的方式有observeOn()和subscribeOn()这两种。本文的标题是关于Scheduler原理方面的,但是直接去看Scheduler的实现其实是一种比较笨的办法。因为Scheduler其实是一个功能类,从使用方式上研究是一种更容易的方式。所谓的使用方式其实就是observeOn()和subscribeOn()这两个方法了。

  • observeOn方法

废话少说,还是先来看一个observeOn()方法的使用例子吧!

    public static void main(String[] args) {
        Observable.range(20, 5)
            .doOnEach(debug("number", ""))
            .observeOn(Schedulers.computation())
            .map(v -> v + v)
            .doOnEach(debug("doubled", ""))
            .subscribe();
    }

代码段中的debug函数的作用只是格式化输出,所以省略了它的代码。上述程序的某次运行结果如下,看起来还是挺凌乱的。但还是可以发现,observeOn成功的切换了后续代码的运行线程。

 main | number :  >20
 main | number :  - >21
 RxComputationScheduler-1 | doubled :  >40
 main | number :  - - >22
 RxComputationScheduler-1 | doubled :  - >42
 main | number :  - - - >23
 RxComputationScheduler-1 | doubled :  - - >44
 main | number :  - - - - >24
 RxComputationScheduler-1 | doubled :  - - - >46
 main | number :  - - - - - > | 
 RxComputationScheduler-1 | doubled :  - - - - >48
 RxComputationScheduler-1 | doubled :  - - - - - > | 

与其说“线程切换”,更科学的叫法应该是线程间通信,observeOn的逻辑大概就是把链上的消息通过线程间通信的方法传递给另外一个线程。observeOn方法指定的执行体执行指定的工作。这样的一个事务逻辑便是所谓的“线程切换”。理论上的分析是这样,那么RxJava实际又是怎么实现的呢?observeOn的源代码其实挺简单的,主要代码如下:

        public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed() || finished) {
                return;
            }
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }
        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(this);
            }
        }

这里的recursiveScheduler其实就是我们所说的执行体,observeOn方法是通过ObserveOnOperator实现的。更底层的实现是一个Subscriber,它的onNext()方法接收一个消息,然后存在队列queue里,然后通知执行体执行自己。需要指出的是,这个ObserveOnSubscriber实现了Action0接口(一个void call()方法),所以它可以直接通知Scheduler执行自己。至于call方法,其实也很简单,它的任务便是从queue中取出消息执行并传递下去。

  • subscribeOn方法

虽然说起来,subscribeOn和observeOn都是用于线程切换的,但他们还是有挺大区别的。RxJava的设计中最重要的两个流程,即订阅和通知。简单说来,subscribeOn是在订阅的链路上实现线程切换,而observeOn是在通知的链路上实现线程切换。从理论上讲,订阅链路和通知链路是可以统一的(使用同一个线程),但实际上很少会这样做。一个比较有趣的例子如下:

       CountDownLatch latch = new CountDownLatch(1) ;
        Observable<?> range = Observable
                . range( 20, 3)
                . subscribeOn(Schedulers. newThread() )
                . doOnEach( debug( " Source" , "") )
                . subscribeOn(Schedulers. io() )
                . map( n -> n + 48)
                . doOnEach( debug( " +48 " , " " ) )
                . subscribeOn(Schedulers. computation() )
                . map( n -> Character. toChars( n) )
                . map( c -> c[ 0] )
                . doOnEach( debug( " Chars " , " " ) );
        range. subscribe( ) ;
        latch. await( ) ;

上面这段代码使用了三个subscribeOn,但是实际的运行结果是,所有的debug输出都在Schedulers.newThread()的线程中。这个问题的答案其实就在subcribeOn的实现中。

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

                source.unsafeSubscribe(s);
            }
        });
    }
}

看这段代码就明白了吧,source.unsafeSubscribe(s)是在Scheduler的线程中完成的,然后就返回了。所以这些onNext,onCompleted就会在最后的那个线程中执行,当然前提是没有ObserveOn来“添乱”。

这里写图片描述

这个图不是很对应我前面的例子,这里就将就着用一下。上面展示了整个订阅的流程,这种迭代式的回调方式,最前面的那个线程将会调用各个Subscriber的回调。所以会出现前文所示的现象。

发表评论