在上一篇blog中,我们通过一个简单的例子探索了RxJava的设计原理。而这篇博客的主要内容是研究RxJava的高阶算子。然而,RxJava中的高阶算子非常之多,每一个分析是不太现实的。所以,这篇文章的内容主要以map和flatMap为例,分析一下RxJava中算子的实现方式。
-
map算子的原理
有过函数式编程语言学习经验的同学大概对map算子都不会感到陌生。即使没有学习过functional programming,也大概听过MapReduce。map算子的功能是把一个值作为输入,对应地输出一个值,实现了
“一一映射”的功能。在RxJava中,map的简单用法如下:
Observablesimple = Observable.just( 2, 3, 5, 8); Observable duplicate = simple.map(v -> 2*v); duplicate.subscribe( (v) -> System.out.println(v), (e) -> { System.err.println("Error"); System.err.println(e.getMessage()); }, () -> System.out.println("ended!") );
上述代码片段中,通过map运算符,我们实现了把一个整数序列的值翻倍的功能。在这里map算子像一座桥梁,把Observable与Subscriber连接在一起。那么map是怎么实现这种功能的呢?不废话,看代码。
public finalObservable map(Func1 super T, ? extends R> func) { return create(new OnSubscribeMap (this, func)); } public static Observable create(OnSubscribe f) { return new Observable (RxJavaHooks.onCreate(f)); } protected Observable(OnSubscribe f) { this.onSubscribe = f; }
好吧!我其实是很失望的,因为map和第一篇中的just好像也没有什么不同嘛!非也,map真正核心的东西其实是这个OnSubscribeMap。话不多说,来看看这个“核心”有什么不同的?
public final class OnSubscribeMapimplements OnSubscribe { final Observable source; final Func1 super T, ? extends R> transformer; public OnSubscribeMap(Observable source, Func1 super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } @Override public void call(final Subscriber super R> o) { MapSubscriber parent = new MapSubscriber (o, transformer); o.add(parent); source.unsafeSubscribe(parent); } }
看这个OnSubscribeMap的构造函数就可以发现,这个类维护了两个重要的字段,即source和transformer,其中source是调用map算子的“源”Observable,而transformer则是map算子的参数,即“映射函数”。至于call函数接口,在上一篇博客中已经介绍,即Observable实例被订阅时(包括直接订阅或者间接订阅),该接口就会被调用。
OnSubscribeMap的call接口中,首先创建了一个MapSubscriber对象,顾名思义就是一个Subscriber,然后使用该Subscriber订阅了“源”Observable。请注意这里的迭代逻辑,map生成的Observable被订阅导致call接口被调用,在call接口中新建的Subscriber(订阅者)又会订阅“源”Observable。
-
flapMap算子的简介
flapMap是map的强化版,除了基本的一一映射,flapMap还能实现一对多的映射、通知排序等等。下面这个例子展示了flatMap如何实现多重映射:
Observablesimple = Observable .just( 2, 3, 5, 8); Observable fm = simple.flatMap(i -> copy(i)); fm.subscribe( (v) -> System.out.println(v), (e) -> { System.err.println("Error"); System.err.println(e.getMessage()); }, () -> System.out.println("ended!") ); } static Observable copy(int a) { return Observable. create( subscriber -> { subscriber.onNext(a); subscriber.onNext(a); }); }
其实flatMap的实现关键在于merge,merge相关的内容,我准备在下一篇博客中进行分析。
发表评论