在我的spring boot应用程序中,我有一些用例,需要读取文件并写入数据库。由于文件很大,因此我需要分块读取和写入块(否则服务器会抛出OutOfMemoryError)。
我当时正在考虑使用RxJava,如下所示-
Iterator<Feature> features = getFeatures(file);
Observable<String> obs = Observable.create(
emitter -> {
try {
while (features.hasNext()) {
Feature feature = features.next();
emitter.onNext(feature);
}
emitter.onComplete();
} catch (Exception ex) {
emitter.onError(ex);
}
});
观察者代码-
public class FeatureWriter implements Observer<Feature> {
private FeatureDAO featureDAO;
@Override
public void onNext(@NonNull List<Feature> feature) {
try {
featureDAO.save(features);
} catch (DAOException cause) {
Exceptions.propagate(cause);
}
}
@Override
public void onError(@NonNull Throwable e) {
log.error("Error occurred while writing.... {}", e.getMessage());
throw new RuntimeException("Error reactive..");
}
}
订阅
featureObservable.buffer(10).subscribe(new FeatureWriter ());
如果Observer.onNext
抛出错误,我需要将异常返回给API的客户端(例如,我想将DAOException一直传播到API并由Spring boot @ControllerAdvice处理)。
也许这不是反应性的用例,但是由于RxJava提供的丰富功能(例如Flowable背压处理),我想以阻塞的方式使用RxJava。如何使用RxJava实现呢?我知道Rx有很多error operator,例如doOnError, onErrorComplete
等,但是所有这些运算符都不允许我将异常扔回上层。
我可以使用Rx默认错误处理程序吗?