如何处理RxJava中观察者的onNext引发的异常?

考虑以下示例:

Observable.range(1, 10).subscribe(i -> {

System.out.println(i);

if (i == 5) {

throw new RuntimeException("oops!");

}

}, Throwable::printStackTrace);

这将输出从1到5的数字,然后打印异常。

我要实现的是使观察者保持订阅状态,并在引发异常后继续运行,即打印从1到10的所有数字。

我曾尝试使用retry()和其他各种错误处理运算符,但是,正如文档中所述,它们的目的是处理可观察对象自身发出的错误。

最直接的解决方案是将整个过程包装onNext到try-

catch块中,但这对我来说似乎不是一个好的解决方案。在类似的Rx.NET问题中,提出的解决方案是使扩展方法可以通过创建可观察的代理来进行包装。我试图重做:

Observable<Integer> origin = Observable.range(1, 10);

Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->

origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {

System.out.println(i);

if (i == 5) {

throw new RuntimeException("oops!");

}

}, Throwable::printStackTrace);

这不会改变任何东西,因为RxJava本身将订户包装为SafeSubscriber。使用unsafeSubscribe绕过它似乎也不是一个好的解决方案。

我该怎么做才能解决这个问题?

回答:

这是学习Rx时出现的常见问题。

TL; DR

建议您将异常处理逻辑放入订阅服务器中,而不是创建一个通用的可观察包装器。

说明

记住,Rx是将事件推送给订户。

从可观察的界面来看,很明显,除了可观察的对象处理事件花费的时间或任何引发的异常中包含的信息外,可观察的对象并没有真正了解用户。

通用包装器处理订户异常并继续向该订户发送事件是一个坏主意。

为什么?那么,可观察者应该只真正知道订户现在处于未知的故障状态。在这种情况下继续发送事件是不明智的,例如,订户可能处于一种状态,即从此刻开始的每个事件都将引发异常并花一些时间来处理。

一旦订户抛出异常,可观察者只有两种可行的行动方案:

  • 重新抛出异常
  • 实施常规处理以记录故障并停止发送事件(任何类型的事件),并清理归因于该订阅者的所有资源,并继续进行所有剩余的订阅。

对订户异常的特定处理将是一个糟糕的设计选择。这将在订户和可观察者之间造成不适当的行为耦合。因此,如果您想对不良订户有弹性,以上两个选择实际上就是可观察对象本身的明智责任极限。

如果您希望 订户 具有弹性并继续前进,则应绝对将其包装在异常处理逻辑中,该逻辑旨在处理您知道如何恢复的 特定异常

(并可能处理瞬时异常,日志记录,重试逻辑,断路等)。 )。

只有订户本身才有上下文来了解是否适合在遇到故障时接收其他事件。

如果您的情况需要开发可重用的错误处理逻辑,则将自己放在包装观察者的事件处理程序而不是

观察的事件的心态上,并且请注意不要在失败时盲目地进行事件传递。放开!尽管没有写过有关Rx的文章,但对于娱乐性软件工程经典来说,在最后一点上还有很多话要说。如果您还没有阅读,我强烈建议您。

以上是 如何处理RxJava中观察者的onNext引发的异常? 的全部内容, 来源链接: utcz.com/qa/415615.html

回到顶部