越用RxJava,越觉得它好用,所以不知不觉地发现代码里到处都是RxJava的身影。然而,RxJava也不是银弹,其中仍然有很多问题需要解决。这里,我简单地总结一下自己遇到的一些“坑”,内容上可能会比较松散。
- 考虑主线程的切换
RxJava中一个常用的使用方法是——在其他线程中做处理,然后切换到UI线程中去更新页面。其中,线程切换就是使用了observeOn()。后台下载文件,前台显示下载进度就可以使用这种方式完成。然而,实践发现这其中有坑。如果文件比较大,而下载包的粒度又比较小,这将导致很多通知积压下来,最终导致错误。
这种错误其实也是可以理解的,毕竟MainLooper是根据Message来工作的,Message过多必然会导致一些问题。当然,这还是比较想当然的想法,最终还是需要到源码中一探究竟。ObserveOn的原理在前面关于RxJava的文章已经有过分析,这里还是简单列一下代码。其中的重点还是OperatorObserveOn的内部类——ObserveOnSubscriber。其重要代码片段如下:
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;
/* The emission threshold that should trigger a replenishing request. */
final int limit;
the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
/** Remembers how many elements have been emitted before the requests run out. */
long emitted;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
}
关键点就在于这个queue成员,这个队列存放了需要进行发送给下行线程的消息。对于主线程来说,符合其实是比较重的,从消息的生产者和消费者的模式讲,过多过快的消息会导致消息阻塞。甚至,都到不了阻塞的情况,因为queue的大小会有上限,在onNext()方法中的queue.offer()可能会产生异常,这取决于queue的实现方式。但无论如何都不可能无限大,所以无法保证绝对不出异常。
解决这个问题的方法其实也很简单,可以在生产者降低消息的产生频率。也可以在消息处理的时候先不进行线程切换,而是通过判断,在必要的时候进行线程切换,比如使用runOnUIThread()。
- RxJava避免内存泄漏
RxJava的响应式机制本质上还是回调实现的,因此内存泄漏也是会出现的。倘若不对Subscription进行管理,内存泄漏会非常严重。对于Subscription,其实有几个比较广泛使用的方法,比如RxLifecycle,以及简单的CompositeSubscription。至于它们的使用方法,其实都非常简单,这里就不赘述了。
说到内存泄漏,就谈点题外话,动画也可能导致内存泄漏。其原因仍然是一些回调函数,这些回调函数实现的View变化的功能,但是在被撤销以后,回调函数没有取消掉,同时View可能持有Context信息,从而导致内存泄漏。最近才发现,LoadToastView这个开源库一直存在内存泄漏,其原因正如上文所说。
发表评论