RxJava Error Handling Operators
简介
RxJava提供了一些操作函数去处理错误相关的情景,使用这些函数可以:
- 吞掉这个错误,切换到一个备用的Observable继续发射数据
- 吞掉这个错误然后发射默认值
- 吞掉这个错误并立即尝试重启这个Observable
- 吞掉这个错误,在等待一段时间后重启这个Observable
相关处理函数
处理错误相关的操作函数包括:
onErrorReturn
public final Observable<T> onErrorReturn(Func1<java.lang.Throwable,? extends T> resumeFunction)
Instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.
一般出错时,会调用onError,然后结束.使用onErrorReturn,onError不会调用,而是发送onErrorReturn里面提供的值,然后onCompleted会被调用. 注意,紧随其后onCompleted会调用.
例子:
Observable<String> observable = Observable.create(new OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> arg0) {
arg0.onNext("a1");
arg0.onNext("a2");
arg0.onError(new Exception("testErrorReturn mock error"));
}
});
observable.onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable arg0) {
Utils.println("onErrorReturn call");
return "aError";
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Utils.println("onCompleted");
}
@Override
public void onError(Throwable arg0) {
Utils.println("onError");
}
@Override
public void onNext(String arg0) {
Utils.println("onResult:"+arg0);
}
});
输出:
onResult:a1
onResult:a2
onErrorReturn call
onResult:aError
onCompleted
onErrorResumeNext
声明:
public final Observable<T> onErrorResumeNext(final Observable<? extends T> resumeSequence) {
public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction);
如果原Observable发生错误时,不调用onError,而是使用一个新的Observable,调用它的call函数(其实就是订阅它),里面onNext的值回传到subscriber,如果有调onCompleted,则subscriber会收到,如果没有则不会收到. 即后续的行为由新的Observable决定了.
上面两个函数的区别是第一个始终提供一个Observable,而第二个可以根据不同的Throwable提供不同的Observable.
例子:
Observable<String> observable = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> arg0) {
arg0.onNext("value1");
arg0.onNext("value2");
arg0.onNext("value3");
arg0.onError(new Exception("mock testErrorResumeNext exception"));
arg0.onNext("value4");
}
});
Observable<String> resumeObservable = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> arg0) {
arg0.onNext("valueMock1");
arg0.onNext("valueMock2");
}
});
observable.onErrorResumeNext(resumeObservable).subscribe(getSubscriber());
输出:
onResult:value1
onResult:value2
onResult:value3
onResult:valueMock1
onResult:valueMock2
如果resumeObservable调用onCompleted,则onCompleted会被调用.
onExceptionResumeNext
public final Observable<T> onExceptionResumeNext(final Observable<? extends T> resumeSequence);
Instructs an Observable to pass control to another Observable rather than invoking Observer#onError if it encounters an Exception.
*** 和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception. ***
retry
if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error.
如果Observable.onError调用,即发射错误,会重新订阅原来的Obserable,即重新触发Observable,也就是从新调它的call函数.这种方式数据会重复.
- public final Observable<T> retry():
无限重试,直到成功. - public final Observable<T> retry(final long count):
指定次数重试,超过指定次数还没成功,则subscriber.onError会被调用. - public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate):
传入一个Fun2,里面有两个入参,一个表示目前参数的次数,另一个就是Observable.onError里的类型,返回值true表示继续重试,false表示不重试,Subscriber.onError会被调用.
也就是说只要Observable.onError被调用后,会调用Func2去判断是否要重试,Integer表示询问重试的次数,从1开始.
retry(count)例子:
Observable<String> observable = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> arg0) {
arg0.onNext("value1");
arg0.onNext("value2");
arg0.onNext("value3");
arg0.onError(new Exception("mock testRetryCount exception"));
arg0.onNext("value4");
}
});
observable.retry(1).subscribe(getSubscriber());
输出:
onResult:value1
onResult:value2
onResult:value3
onResult:value1
onResult:value2
onResult:value3
onError
从上面可以看出,重试了一次,后面不重试,Subscriber.onError被调用.另外由于重试,数据重复了.
retry(Func2)例子:
Observable<String> observable = Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> arg0) {
arg0.onNext("value1");
arg0.onNext("value2");
arg0.onNext("value3");
arg0.onError(new Exception("mock testRetryFunc2 exception"));
arg0.onNext("value4");
}
});
observable.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer arg0, Throwable arg1) {
Utils.println("testRetryFunc2 func2::call time:"+arg0.intValue());
return (arg0.intValue()>1)?false:true;
}
})
.subscribe(getSubscriber());
输出:
onResult:value1
onResult:value2
onResult:value3
testRetryFunc2 func2::call time:1
onResult:value1
onResult:value2
onResult:value3
testRetryFunc2 func2::call time:2
onError
Integer从1开始,第一次重试,第二次返回false所以就不重试了,直接onError了.
retryWhen
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler);
public final Observable<T> retryWhen(final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler, Scheduler scheduler);
if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
retryWhen的输入参数是Func1,接受一个输入Observable<Throwable>, 返回输出参数Observable<?>。
返回的Observable<?>所要发送的事件决定了重试是否会发生:
- 如果发送的是onCompleted或者onError事件,将不会触发重订阅。
- 如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件)。
这就是为什么使用了通配符作为泛型类型:这仅仅是个通知(next, error或者completed),一个很重要的通知而已。
*** 可以理解为如果原Observable.onError发生, 如果Func1返回的Observable<?>的onNext被调用,则重新订阅原来的Observable重试,否则不重试.***
输入的Observable(即Func1里Observable<?extends Throwable>)必须作为输出Observable(即Func1的返回值)的源。我们必须对Observable<Throwable>做出反应,然后基于它发送事件;不能只返回一个通用泛型流。
原Observable每次一调用onError(Throwable),Observable<Throwable>都会被作为输入传入方法中。换句话说就是,它的每一次调用我们都需要决定是否需要重订阅。
下面的例子会重试:
source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override public Observable<?> call(Observable<? extends Throwable> errors) {
return errors.flatMap(new Func1<Throwable, Observable<?>>() {
@Override public Observable<?> call(Throwable error) {
return Observable.timer(5, TimeUnit.SECONDS);
}
});
}
})
总结:
- onErrorReturn,吞掉错误,提供一个默认的返回值.
- onErrorResumeNext,吞掉错误,提供一个默认的Observable继续发送数据.
- onExceptionResumeNext,和onErrorResumeNext类似,区别是onErrorResumeNext是针对onError的throwable,而onExceptionResumeNext仅仅是针对onError里的类型是Exception.
- retry,吞掉错误,按条件重新订阅重试
- retryWhen,吞掉错误,由另外一个提供的Observable的行为决定是否重试,如果onNext被调用,则重新订阅重试,onError或者onComplete被调用,则不重试.
RxJava特有的异常
-
CompositeException:
表示发生了多个异常。你可以使用异常的 getExceptions() 方法获取单独的异常。 -
MissingBackpressureException:
表示一个订阅者或者操作符试图对一个不支持反压操作的Observable应用该操作。可以参考 [[Backpressure]] 查找针对没有实现反压操作的Observable的解决办法。 -
OnErrorFailedException:
表示Observable尝试调用观察者的 onError() 方法,但是那个方法自己抛出了异常。 -
OnErrorNotImplementedException:
表示一个Observable尝试调用它的观察者的onError() 方法,但是那个方法不存在。有多种方法可以消除这个错误:可以调整Observable使它不会到达这个错误条件,也可以在观察者中实现一个onError 处理器, 或者使用其它的操作符在错误到达之前拦截这个 onError 通知。 -
OnErrorThrowable:
观察者们可以用这种形式传递异常给它们的观察者们的 onError() 方法。相比标准的Throwable,这种Throwable包含更多的信息:错误本身和在错误发生时Observable的内部状态。
...
...