RxJava-将错误传播回RxJava中的主线程

发布时间:2020-07-07 17:30

在我的spring boot应用程序中,我有一些用例,需要读取文件并写入数据库。由于文件很大,因此我需要分块读取和写入块(否则服务器会抛出OutOfMemoryError)。

我当时正在考虑使用RxJava,如下所示-

Iterator<Feature> features = getFeatures(file);
Observable<String> obs =  Observable.create(
    emitter -> {
      try {
        while (features.hasNext()) {
          Feature feature = features.next();        
          emitter.onNext(feature);
        }
        emitter.onComplete();
      } catch (Exception ex) {
        emitter.onError(ex);
      }
    });

观察者代码-

    public class FeatureWriter implements Observer<Feature> {

      private FeatureDAO featureDAO;

      @Override
      public void onNext(@NonNull List<Feature> feature) {
        try {
          featureDAO.save(features);
        } catch (DAOException cause) {
          Exceptions.propagate(cause);
        }
      }
    
      @Override
      public void onError(@NonNull Throwable e) {
        log.error("Error occurred while writing.... {}", e.getMessage());
        throw new RuntimeException("Error reactive..");
      } 
   }

订阅

featureObservable.buffer(10).subscribe(new FeatureWriter ());

如果Observer.onNext抛出错误,我需要将异常返回给API的客户端(例如,我想将DAOException一直传播到API并由Spring boot @ControllerAdvice处理)。

也许这不是反应性的用例,但是由于RxJava提供的丰富功能(例如Flowable背压处理),我想以阻塞的方式使用RxJava。如何使用RxJava实现呢?我知道Rx有很多error operator,例如doOnError, onErrorComplete等,但是所有这些运算符都不允许我将异常扔回上层。 我可以使用Rx默认错误处理程序吗?

回答1