RxJava和观察者代码的并行执行

我正在使用RxJava Observable api使用以下代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());

observable

.buffer(10000)

.observeOn(Schedulers.computation())

.subscribe(recordInfo -> {

_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());

for(Info info : recordInfo) {

// some I/O operation logic

}

},

exception -> {

},

() -> {

});

我的期望是观察代码,即subscribe()方法中的代码,在我指定了计算调度程序后将并行执行。相反,代码仍在单线程上按顺序执行。如何使用RxJava

api使代码并行运行。

回答:

RxJava在异步/多线程方面经常被误解。多线程操作的编码很简单,但是了解抽象是另一回事。

关于RxJava的一个常见问题是如何实现并行化,或如何从Observable同时发出多个项目。当然,此定义违反了Observable

Contract,该协议规定onNext()必须被顺序调用,并且一次不能由多个线程同时调用。

要实现并行性,您需要多个Observable。

这在一个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())

.map(i -> intenseCalculation(i))

.subscribe(val -> System.out.println("Subscriber received "

+ val + " on "

+ Thread.currentThread().getName()));

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)

.subscribeOn(Schedulers.computation())

.map(i -> intenseCalculation(i))

).subscribe(val -> System.out.println(val));

代码和文本来自此博客文章。

以上是 RxJava和观察者代码的并行执行 的全部内容, 来源链接: utcz.com/qa/397913.html

回到顶部