在上一篇blog中,我们主要讨论了一些关于map和flatMap实现方面的知识。但是flatMap远比map复杂,flatMap涉及了merge算子方面的东西。所以这篇博文将重点分析merge算子的实现原理。merge算子的功能是把多个Observable整合到一起,并创建一个新的Observable。听起来这个功能其实并不是很难理解,merge就像两江交汇一般,把各支流的水汇聚到一起。《Learning Reactive Programming》一书中有非常形象的配图。下图就描述了merge的功能:
从视觉上就能看得出来,merge与flatMap非常相似,而这也是flatMap底层使用merge实现的原因。一个例子让你明白flatMap的作用,代码如下。
int threadCt = Runtime.getRuntime().availableProcessors() + 1; ExecutorService executor = Executors.newFixedThreadPool(threadCt); Scheduler scheduler = Schedulers.from(executor); Observablesimple1 = Observable .just( 1, 2, 3, 4); Observable fm1 = simple1.flatMap( v -> Observable.just(v, v).subscribeOn(scheduler)); fm1.subscribe( (v) -> System.out.println(v), (e) -> { System.err.println("Error"); System.err.println(e.getMessage()); }, () -> System.out.println("ended!") );
这段程序的输出序列是:4, 4, 1, 1, 2, 3, 3, 2, ended!当然,这段程序的执行结果是不定的,因为这里的flapMap算子使用了并行执行的方式。先简单解释一下这段程序,前三行是在设置线程池,simple1是一个简单的整数序列值,传递给flatMap是一个这样的函数,它接受一个整数并返回一个包含两个整数拷贝的Observable。simple1中的每个值都将通过这个函数,所以会有4个中间Observable产生。然后RxJava通过merge运算符把多个Observable连接到一起。由于多线程并行运算,所以merge的结果就会显得没有规律。接下来就轮到merge运算符登场了。
merge功能的实现
首先,思考merge实现的逻辑,可以预见merge有缓冲区,因为会存在多线程传递数据的问题。事实上也确实如此,在搜寻代码时,我首先尝试寻找队列之类的数据结构,结果大有收获。且看如下代码逻辑:
public staticObservable merge(Observable extends Observable extends T>> source) { if (source.getClass() == ScalarSynchronousObservable.class) { return ((ScalarSynchronousObservable )source).scalarFlatMap((Func1)UtilityFunctions.identity()); } return source.lift(OperatorMerge. instance(false)); }
在这里lift的功能很简单,基本上就是构建一个“傀儡型”的OnSubscribe。真活还是OperatorMerge来干的,OperatorMerge提供了一个call函数,供傀儡OnSubscribe调用。该call函数的代码如下:
public Subscriber> call(final Subscriber super T> child) { MergeSubscriber subscriber = new MergeSubscriber (child, delayErrors, maxConcurrent); MergeProducer producer = new MergeProducer (subscriber); subscriber.producer = producer; child.add(subscriber); child.setProducer(producer); return subscriber; }
看到这里,大概也可以估计出merge的设计核心就在于MergeSubscriber和MergeProducer了。起初,我以为主要的逻辑应该在Producer中,然而事实上,重要的逻辑其实是放在MergeSubscriber中。这是因为Producer持有了Subscriber的应用,在下一级请求的时候,Producer只是简单的通知了Subscriber去emit一个值。所以,重要的逻辑还是需要到MergeSubscriber里边看。重要的代码如下:
public void onNext(Observable extends T> t) { if (t == null) { return; } if (t == Observable.empty()) { emitEmpty(); } else if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable extends T>)t).get()); } else { InnerSubscriberinner = new InnerSubscriber (this, uniqueId++); addInner(inner); t.unsafeSubscribe(inner); emit(); } } void addInner(InnerSubscriber inner) { getOrCreateComposite().add(inner); synchronized (innerGuard) { InnerSubscriber>[] a = innerSubscribers; int n = a.length; InnerSubscriber>[] b = new InnerSubscriber>[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = inner; innerSubscribers = b; } }
作为一个Subscriber,最重要的逻辑当然是onNext了。onNext中最重要的逻辑就是addInner了,addInner的作用是使用RCU的方式增加merge的源。至于MergeSubscriber的emit函数的作用其实就是把变量从InnerSubscriber搬到下一级的subscriber。哈哈,一切都水落石出了!
至此,merge的流程就分析完毕了。对于下一篇blog,暂时想不到很想写的主题,改天再说了!
发表评论