RxJava即Reactive Java,是响应式编程范式在Java上的一种实现。响应式编程范式的目标是,提高程序的容错性、降低软件模块的耦合性、提高程序相应速度。到目前为止,几乎所有主流语言都有相应的reactive库。其中,RxJava在android的开发中,应用的非常广泛。我使用RxJava也有一两个月了,期间断断续续的学习了RxJava。坦诚的说,刚开始的时候确实有点晕。首先,这种相应式的编程范式确实比较新颖,其次,我很好奇RxJava背后的实现原理,但一时间又想不通。既然想不通,我就打算看源码来分析,所以就有了这篇blog。
-
RxJava基本用法的实现
这里所说的基本用法是指,只使用Observable、Observer,而不使用其他高阶运算符的用法。毕竟研究问题,还是由浅入深相对容易理解。首先研究下面这个例子:
Observablesimple = Observable.just( 2, 3, 5, 8); simple.subscribe( (v) -> System.out.println(v), (e) -> { System.err.println("Error"); System.err.println(e.getMessage()); }, () -> System.out.println("ended!") );
这个例子非常简单,就是把几个数字打印出去,最后输出“ended!”。但是这个例子又刚好包含了RxJava运行的主要原理,麻雀虽小五脏俱全哟!这里插一句话,这篇blog定位于研究RxJava原理,对入门阶段的同学可能并不合适。首先,研究Observable.just(),它的调用栈大致如下:
//Observable.java public staticObservable just(T t1, T t2, T t3, T t4) { return from((T[])new Object[] { t1, t2, t3, t4 }); } public static Observable from(T[] array) { /* */ return create(new OnSubscribeFromArray (array)); } public static Observable create(OnSubscribe f) { return new Observable (RxJavaHooks.onCreate(f)); } protected Observable(OnSubscribe f) { this.onSubscribe = f; }
其中RxJavaHooks是一个用于管理组件生命周期的辅助类,这里不做研究。我们可以看出,Observable.just()首先把几个整数组成了一个数组,然后使用它们构造了一个OnSubscribeFromArray类,最后,新建了一个Observable实例并把前面的数组类传递给该实例。那么这个OnSubscribeFromArray类又有什么玄机呢?继续查看它的源码。
public final class OnSubscribeFromArrayimplements OnSubscribe { final T[] array; public OnSubscribeFromArray(T[] array) { this.array = array; } @Override public void call(Subscriber super T> child) { child.setProducer(new FromArrayProducer (child, array)); } /* */ }
这构造函数非常简单,就是把数组保存起来。值得注意的是这个call方法,这个方法定义于OnSubscribe接口。该方法接收一个Subscriber对象,然后通过调用setProducer方法给该对象传递一个FromArrayProducer实例。这个类的代码,我们等一会再来看,先卖个关子。
通过Observable.just(),程序创建了一个Observable实例simple。然后simple.subscribe()方法为simple添加了一个观察者,其调用栈如下。
//Observable.java public final Subscription subscribe(final Action1 super T> onNext, final Action1onError, final Action0 onCompleted) { /* */ return subscribe(new ActionSubscriber (onNext, onError, onCompleted)); } public final Subscription subscribe(Subscriber super T> subscriber) { return Observable.subscribe(subscriber, this); } static Subscription subscribe(Subscriber super T> subscriber, Observable observable) { // try { /* */ RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { /* */ return Subscriptions.unsubscribed(); } }
这个Subscriber其实是用三个接口函数组成的。上述代码片段中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)在功能上等价于observable.onSubscribe.call(subscriber)。而observable的onSubscribe字段指向的就是OnSubscribeFromArray。所以,这句程序实际上就是调用了OnSubscribeFromArray.call。call函数实际调用了Subscriber.setproducer()。
//Subscriber.java public void setProducer(Producer p) { /* */ if (passToSubscriber) { subscriber.setProducer(producer); } else { if (toRequest == NOT_SET) { producer.request(Long.MAX_VALUE); } else { producer.request(toRequest); } } }
这里重点就是producer.request()。而此处的Producer是一个FromArrayProducer,它的request方法最终会调用下面或者类似逻辑的代码:
void fastPath() { final Subscriber super T> child = this.child; for (T t : array) { if (child.isUnsubscribed()) { return; } child.onNext(t); } if (child.isUnsubscribed()) { return; } child.onCompleted(); }
看到了child的各个接口在这里被依次调用,一切真相都水落石出了。这边blog也基本写到尾声了,最后以一个图结束了吧!代码和插图更配哟! ()
下一篇blog,准备解析一些高阶函数的实现原理。
发表评论