【JS】在 Rxjs 中,为什么 mergeScan 操作符累加会失效?

我想用 promise 中返回的数据进行累加,于是应用了 rxjs 中的 mergeScan 操作符。但是在应用中发现,如果在返回的流中包含了 flatMap 过的 promise,则返回的值不会累加到累加器上面去:

const Rx = require('rxjs/Rx');

const {

of ,

} = require('rxjs');

const click$ = new Rx.Subject();

const seed = 0;

const testPromise = new Promise((resolve) => {

resolve(1);

})

const count$ = click$.mergeScan((acc, promise) => of (promise)

.flatMap(promise => promise)

.map((one) => {

console.log('acc', acc);

return acc + one

}), seed);

count$.subscribe(x => console.log('value',x));

click$.next(testPromise);

click$.next(testPromise);

click$.next(testPromise);

输出如下,累加器的值没有增加:

acc, 0

value, 1

acc, 0

value, 1

acc, 0

value, 1

但是如果去除掉 promise

const Rx = require('rxjs/Rx');

const {

of ,

} = require('rxjs');

const click$ = new Rx.Subject();

const seed = 0;

const count$ = click$.mergeScan((acc, one) => of (one)

.map((one) => {

console.log('acc', acc);

return acc + one

}), seed);

count$.subscribe(x => console.log('value', x));

click$.next(1);

click$.next(1);

click$.next(1);

输出是:

acc, 0

value, 1

acc, 1

value, 2

acc, 2

value, 3

可以正常的工作。
请问这是什么原因呢?我该如何修复这个问题?

回答

你是同步调用了三次next,导致并没有产生累加的过程。
可以添加observeOn(asyncScheduler),强制成异步。

const Rx = require('rxjs/Rx');

const {

of ,

} = require('rxjs');

const click$ = new Rx.Subject();

const seed = 0;

const testPromise = new Promise((resolve) => {

resolve(1);

})

const count$ = click$

.observeOn(Rx.asyncScheduler)

.mergeScan((acc, promise) => of (promise)

.flatMap(promise => promise)

.map((one) => {

console.log('acc', acc);

return acc + one

}), seed);

count$.subscribe(x => console.log('value',x));

click$.next(testPromise);

click$.next(testPromise);

click$.next(testPromise);

【JS】在 Rxjs 中,为什么 mergeScan 操作符累加会失效?

分析

mergeScan会在每次接受到新的数据时调用迭代函数,并订阅函数返回的流,之后会把返回的流的最后一个数据发送给下游.默认情况下是并发执行.

在你的第一个例子中,同步发射了三个Promise.
当第一个testPromisemergeScan接收时,mergeScanseedtestPromise传递给迭代函数的accpromise,并订阅迭代函数返回的流,因为流内部需要获取异步函数中返回的结果,所以这个流是个异步流.
当第二个testPromise被接收时,因为mergeScan是并发执行的,而第一个流是异步流,结果还没有返回,所以这个时候还是把seedtestPromise传递给迭代函数.
第三个testPromise也一样.

在第二个例子中,发射的是 1,这个时候迭代函数返回的是一个同步流,当迭代函数返回流的同时,这个流也同时结束,mergeScan能获取流的结果,所以当mergeScan接收第二个 1 的时候,已经获取到了返回流的最后一个数据 1,然后传递给迭代函数的acc,所以在第二个例子中能得到你想要的结果.

解决

知道原因之后,要让第一个例子也按照你想要的方式执行的话,就很简单了,只要让mergeScan接收到第二个testPromise的时候不是立即传递给迭代函数,而是等待第一个流返回结果之后,在把结果和testPromise传递给迭代函数,正好mergeScan的第三个参数concurrent就是控制同一时间内mergeScan的最大订阅数的,设置concurrent为 1 就能达到你想要的结果.

const count$ = click$.mergeScan(

(acc, promise) =>

of(promise)

.flatMap(promise => promise)

.map(one => {

console.log('acc', acc)

return acc + one

}),

seed,

1, // 将并发设置为 1

)

参考

以上是 【JS】在 Rxjs 中,为什么 mergeScan 操作符累加会失效? 的全部内容, 来源链接: utcz.com/a/84487.html

回到顶部