RxJava设计原理解析<3>

= 1014

本文出自:【InTheWorld的博客】

    在上一篇blog中,我们主要讨论了一些关于map和flatMap实现方面的知识。但是flatMap远比map复杂,flatMap涉及了merge算子方面的东西。所以这篇博文将重点分析merge算子的实现原理。merge算子的功能是把多个Observable整合到一起,并创建一个新的Observable。听起来这个功能其实并不是很难理解,merge就像两江交汇一般,把各支流的水汇聚到一起。《Learning Reactive Programming》一书中有非常形象的配图。下图就描述了merge的功能:

image

    从视觉上就能看得出来,merge与flatMap非常相似,而这也是flatMap底层使用merge实现的原因。一个例子让你明白flatMap的作用,代码如下。

     int threadCt = Runtime.getRuntime().availableProcessors() + 1;
        ExecutorService executor = Executors.newFixedThreadPool(threadCt);
        Scheduler scheduler = Schedulers.from(executor);
        
        Observable<Integer> simple1 = Observable  
                .just( 1, 2, 3, 4);
        Observable<Integer> fm1 = simple1.flatMap(
                v -> Observable.just(v, v).subscribeOn(scheduler));
        fm1.subscribe(
            (v) -> System.out.println(v),
            (e) -> {
                    System.err.println("Error");
                    System.err.println(e.getMessage());
                },
            () -> System.out.println("ended!")
        );

   这段程序的输出序列是:4, 4, 1, 1, 2, 3, 3, 2, ended!当然,这段程序的执行结果是不定的,因为这里的flapMap算子使用了并行执行的方式。先简单解释一下这段程序,前三行是在设置线程池,simple1是一个简单的整数序列值,传递给flatMap是一个这样的函数,它接受一个整数并返回一个包含两个整数拷贝的Observable。simple1中的每个值都将通过这个函数,所以会有4个中间Observable产生。然后RxJava通过merge运算符把多个Observable连接到一起。由于多线程并行运算,所以merge的结果就会显得没有规律。接下来就轮到merge运算符登场了。


  • merge功能的实现

   首先,思考merge实现的逻辑,可以预见merge有缓冲区,因为会存在多线程传递数据的问题。事实上也确实如此,在搜寻代码时,我首先尝试寻找队列之类的数据结构,结果大有收获。且看如下代码逻辑:

    public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }

   在这里lift的功能很简单,基本上就是构建一个“傀儡型”的OnSubscribe。真活还是OperatorMerge来干的,OperatorMerge提供了一个call函数,供傀儡OnSubscribe调用。该call函数的代码如下:

    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
        MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
        MergeProducer<T> producer = new MergeProducer<T>(subscriber);
        subscriber.producer = producer;
        child.add(subscriber);
        child.setProducer(producer);
        return subscriber;
    }

   看到这里,大概也可以估计出merge的设计核心就在于MergeSubscriber和MergeProducer了。起初,我以为主要的逻辑应该在Producer中,然而事实上,重要的逻辑其实是放在MergeSubscriber中。这是因为Producer持有了Subscriber的应用,在下一级请求的时候,Producer只是简单的通知了Subscriber去emit一个值。所以,重要的逻辑还是需要到MergeSubscriber里边看。重要的代码如下:

        public void onNext(Observable<? extends T> t) {
            if (t == null) {
                return;
            }
            if (t == Observable.empty()) {
                emitEmpty();
            } else
            if (t instanceof ScalarSynchronousObservable) {
                tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
            } else {
                InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                addInner(inner);
                t.unsafeSubscribe(inner);
                emit();
            }
        }
        void addInner(InnerSubscriber inner) {
            getOrCreateComposite().add(inner);
            synchronized (innerGuard) {
                InnerSubscriber[] a = innerSubscribers;
                int n = a.length;
                InnerSubscriber[] b = new InnerSubscriber[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = inner;
                innerSubscribers = b;
            }
        }

    作为一个Subscriber,最重要的逻辑当然是onNext了。onNext中最重要的逻辑就是addInner了,addInner的作用是使用RCU的方式增加merge的源。至于MergeSubscriber的emit函数的作用其实就是把变量从InnerSubscriber搬到下一级的subscriber。哈哈,一切都水落石出了!

至此,merge的流程就分析完毕了。对于下一篇blog,暂时想不到很想写的主题,改天再说了!

发表评论