RxJava设计原理与解析<2>

= 818

本文出自:【InTheWorld的博客】

在上一篇blog中,我们通过一个简单的例子探索了RxJava的设计原理。而这篇博客的主要内容是研究RxJava的高阶算子。然而,RxJava中的高阶算子非常之多,每一个分析是不太现实的。所以,这篇文章的内容主要以map和flatMap为例,分析一下RxJava中算子的实现方式。

  • map算子的原理

image

有过函数式编程语言学习经验的同学大概对map算子都不会感到陌生。即使没有学习过functional programming,也大概听过MapReduce。map算子的功能是把一个值作为输入,对应地输出一个值,实现了
“一一映射”的功能。在RxJava中,map的简单用法如下:

 Observable<Integer> simple = Observable.just( 2, 3, 5, 8);
    Observable<Integer> duplicate = simple.map(v -> 2*v);
    duplicate.subscribe(
        (v) -> System.out.println(v),
        (e) -> {
                System.err.println("Error");
                System.err.println(e.getMessage());
            },
        () -> System.out.println("ended!")
    );

上述代码片段中,通过map运算符,我们实现了把一个整数序列的值翻倍的功能。在这里map算子像一座桥梁,把Observable与Subscriber连接在一起。那么map是怎么实现这种功能的呢?不废话,看代码。

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
    }

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

好吧!我其实是很失望的,因为map和第一篇中的just好像也没有什么不同嘛!非也,map真正核心的东西其实是这个OnSubscribeMap。话不多说,来看看这个“核心”有什么不同的?

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
}

看这个OnSubscribeMap的构造函数就可以发现,这个类维护了两个重要的字段,即source和transformer,其中source是调用map算子的“源”Observable,而transformer则是map算子的参数,即“映射函数”。至于call函数接口,在上一篇博客中已经介绍,即Observable实例被订阅时(包括直接订阅或者间接订阅),该接口就会被调用。

OnSubscribeMap的call接口中,首先创建了一个MapSubscriber对象,顾名思义就是一个Subscriber,然后使用该Subscriber订阅了“源”Observable。请注意这里的迭代逻辑,map生成的Observable被订阅导致call接口被调用,在call接口中新建的Subscriber(订阅者)又会订阅“源”Observable。

  • flapMap算子的简介

image

flapMap是map的强化版,除了基本的一一映射,flapMap还能实现一对多的映射、通知排序等等。下面这个例子展示了flatMap如何实现多重映射:

   Observable<Integer> simple = Observable  
                .just( 2, 3, 5, 8);
        Observable<Integer> fm = simple.flatMap(i -> copy(i));
        fm.subscribe(
            (v) -> System.out.println(v),
            (e) -> {
                    System.err.println("Error");
                    System.err.println(e.getMessage());
                },
            () -> System.out.println("ended!")
        );
    }
    static Observable<Integer> copy(int a) {
        return Observable.<Integer>create( subscriber -> {
            subscriber.onNext(a);
            subscriber.onNext(a);
        });
    }

其实flatMap的实现关键在于merge,merge相关的内容,我准备在下一篇博客中进行分析。

发表评论