【JS】在 Rxjs 中,为什么 mergeScan 操作符累加会失效?
我想用 promise 中返回的数据进行累加,于是应用了 rxjs 中的 mergeScan
操作符。但是在应用中发现,如果在返回的流中包含了 flatMap
过的 promise
,则返回的值不会累加到累加器上面去:
const Rx = require('rxjs/Rx');const {
of ,
} = require('rxjs');
const click$ = new Rx.Subject();
const seed = 0;
const testPromise = new Promise((resolve) => {
resolve(1);
})
const count$ = click$.mergeScan((acc, promise) => of (promise)
.flatMap(promise => promise)
.map((one) => {
console.log('acc', acc);
return acc + one
}), seed);
count$.subscribe(x => console.log('value',x));
click$.next(testPromise);
click$.next(testPromise);
click$.next(testPromise);
输出如下,累加器的值没有增加:
acc, 0value, 1
acc, 0
value, 1
acc, 0
value, 1
但是如果去除掉 promise
:
const Rx = require('rxjs/Rx');const {
of ,
} = require('rxjs');
const click$ = new Rx.Subject();
const seed = 0;
const count$ = click$.mergeScan((acc, one) => of (one)
.map((one) => {
console.log('acc', acc);
return acc + one
}), seed);
count$.subscribe(x => console.log('value', x));
click$.next(1);
click$.next(1);
click$.next(1);
输出是:
acc, 0value, 1
acc, 1
value, 2
acc, 2
value, 3
可以正常的工作。
请问这是什么原因呢?我该如何修复这个问题?
回答
你是同步调用了三次next
,导致并没有产生累加的过程。
可以添加observeOn(asyncScheduler)
,强制成异步。
const Rx = require('rxjs/Rx');const {
of ,
} = require('rxjs');
const click$ = new Rx.Subject();
const seed = 0;
const testPromise = new Promise((resolve) => {
resolve(1);
})
const count$ = click$
.observeOn(Rx.asyncScheduler)
.mergeScan((acc, promise) => of (promise)
.flatMap(promise => promise)
.map((one) => {
console.log('acc', acc);
return acc + one
}), seed);
count$.subscribe(x => console.log('value',x));
click$.next(testPromise);
click$.next(testPromise);
click$.next(testPromise);
分析
mergeScan
会在每次接受到新的数据时调用迭代函数,并订阅函数返回的流,之后会把返回的流的最后一个数据发送给下游.默认情况下是并发执行.
在你的第一个例子中,同步发射了三个Promise
.
当第一个testPromise
被mergeScan
接收时,mergeScan
把seed
和testPromise
传递给迭代函数的acc
和promise
,并订阅迭代函数返回的流,因为流内部需要获取异步函数中返回的结果,所以这个流是个异步流.
当第二个testPromise
被接收时,因为mergeScan
是并发执行的,而第一个流是异步流,结果还没有返回,所以这个时候还是把seed
和testPromise
传递给迭代函数.
第三个testPromise
也一样.
在第二个例子中,发射的是 1,这个时候迭代函数返回的是一个同步流,当迭代函数返回流的同时,这个流也同时结束,mergeScan
能获取流的结果,所以当mergeScan
接收第二个 1 的时候,已经获取到了返回流的最后一个数据 1,然后传递给迭代函数的acc
,所以在第二个例子中能得到你想要的结果.
解决
知道原因之后,要让第一个例子也按照你想要的方式执行的话,就很简单了,只要让mergeScan
接收到第二个testPromise
的时候不是立即传递给迭代函数,而是等待第一个流返回结果之后,在把结果和testPromise
传递给迭代函数,正好mergeScan
的第三个参数concurrent
就是控制同一时间内mergeScan
的最大订阅数的,设置concurrent
为 1 就能达到你想要的结果.
const count$ = click$.mergeScan( (acc, promise) =>
of(promise)
.flatMap(promise => promise)
.map(one => {
console.log('acc', acc)
return acc + one
}),
seed,
1, // 将并发设置为 1
)
参考
- A use case for the mergeScan operator in RxJS: implementing requests in an infinite scroll list
- mergeScan
- mergeScan.ts
- synchronous vs asynchronous sequences in RxJS
以上是 【JS】在 Rxjs 中,为什么 mergeScan 操作符累加会失效? 的全部内容, 来源链接: utcz.com/a/84487.html