RxJava和观察者代码的并行执行
我正在使用RxJava Observable api使用以下代码:
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
我的期望是观察代码,即subscribe()方法中的代码,在我指定了计算调度程序后将并行执行。相反,代码仍在单线程上按顺序执行。如何使用RxJava
api使代码并行运行。
回答:
RxJava在异步/多线程方面经常被误解。多线程操作的编码很简单,但是了解抽象是另一回事。
关于RxJava的一个常见问题是如何实现并行化,或如何从Observable同时发出多个项目。当然,此定义违反了Observable
Contract,该协议规定onNext()必须被顺序调用,并且一次不能由多个线程同时调用。
要实现并行性,您需要多个Observable。
这在一个线程中运行:
Observable<Integer> vals = Observable.range(1,10);vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
这在多个线程中运行:
Observable<Integer> vals = Observable.range(1,10);vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
代码和文本来自此博客文章。
以上是 RxJava和观察者代码的并行执行 的全部内容, 来源链接: utcz.com/qa/397913.html