RxJava设计原理与解析<1>

本文出自:【InTheWorld的博客】

RxJava即Reactive Java,是响应式编程范式在Java上的一种实现。响应式编程范式的目标是,提高程序的容错性、降低软件模块的耦合性、提高程序相应速度。到目前为止,几乎所有主流语言都有相应的reactive库。其中,RxJava在android的开发中,应用的非常广泛。我使用RxJava也有一两个月了,期间断断续续的学习了RxJava。坦诚的说,刚开始的时候确实有点晕。首先,这种相应式的编程范式确实比较新颖,其次,我很好奇RxJava背后的实现原理,但一时间又想不通。既然想不通,我就打算看源码来分析,所以就有了这篇blog。

  • RxJava基本用法的实现

这里所说的基本用法是指,只使用Observable、Observer,而不使用其他高阶运算符的用法。毕竟研究问题,还是由浅入深相对容易理解。首先研究下面这个例子:

        Observable simple = Observable.just( 2, 3, 5, 8);
        simple.subscribe(
            (v) -> System.out.println(v),
            (e) -> {
                    System.err.println("Error");
                    System.err.println(e.getMessage());
                },
            () -> System.out.println("ended!")
        );

这个例子非常简单,就是把几个数字打印出去,最后输出“ended!”。但是这个例子又刚好包含了RxJava运行的主要原理,麻雀虽小五脏俱全哟!这里插一句话,这篇blog定位于研究RxJava原理,对入门阶段的同学可能并不合适。首先,研究Observable.just(),它的调用栈大致如下:

    //Observable.java
    public static  Observable just(T t1, T t2, T t3, T t4) {
        return from((T[])new Object[] { t1, t2, t3, t4 });
    }

    public static  Observable from(T[] array) {
    /* */
        return create(new OnSubscribeFromArray(array));
    }

    public static  Observable create(OnSubscribe f) {
        return new Observable(RxJavaHooks.onCreate(f));
    }

    protected Observable(OnSubscribe f) {
        this.onSubscribe = f;
    }

其中RxJavaHooks是一个用于管理组件生命周期的辅助类,这里不做研究。我们可以看出,Observable.just()首先把几个整数组成了一个数组,然后使用它们构造了一个OnSubscribeFromArray类,最后,新建了一个Observable实例并把前面的数组类传递给该实例。那么这个OnSubscribeFromArray类又有什么玄机呢?继续查看它的源码。

public final class OnSubscribeFromArray implements OnSubscribe {
    final T[] array;
    public OnSubscribeFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void call(Subscriber child) {
        child.setProducer(new FromArrayProducer(child, array));
    }
    /* */   
}

这构造函数非常简单,就是把数组保存起来。值得注意的是这个call方法,这个方法定义于OnSubscribe接口。该方法接收一个Subscriber对象,然后通过调用setProducer方法给该对象传递一个FromArrayProducer实例。这个类的代码,我们等一会再来看,先卖个关子。

通过Observable.just(),程序创建了一个Observable实例simple。然后simple.subscribe()方法为simple添加了一个观察者,其调用栈如下。

    //Observable.java
    public final Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onCompleted) {
    /* */
        return subscribe(new ActionSubscriber(onNext, onError, onCompleted));
    }

    public final Subscription subscribe(Subscriber subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static  Subscription subscribe(Subscriber subscriber, Observable observable) {
     // 
        try {
            /* */
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
        /* */
            return Subscriptions.unsubscribed();
        }
    }

这个Subscriber其实是用三个接口函数组成的。上述代码片段中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)在功能上等价于observable.onSubscribe.call(subscriber)。而observable的onSubscribe字段指向的就是OnSubscribeFromArray。所以,这句程序实际上就是调用了OnSubscribeFromArray.call。call函数实际调用了Subscriber.setproducer()。

    //Subscriber.java
    public void setProducer(Producer p) {
    /* */
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

这里重点就是producer.request()。而此处的Producer是一个FromArrayProducer,它的request方法最终会调用下面或者类似逻辑的代码:

        void fastPath() {
            final Subscriber child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

看到了child的各个接口在这里被依次调用,一切真相都水落石出了。这边blog也基本写到尾声了,最后以一个图结束了吧!代码和插图更配哟!    (吐舌笑脸)

RxJava

下一篇blog,准备解析一些高阶函数的实现原理。

发表评论