RxJS:如何将多个嵌套的观测数据与缓冲区结合使用
警告:RxJS newb在这里。RxJS:如何将多个嵌套的观测数据与缓冲区结合使用
这里是我的挑战:
- 当
onUnlink$
观察到发射... - 立即开始从
onAdd$
观察到的捕获值,最多1秒(我会打电话给此分区onAddBuffer$
) 。 - 查询数据库(创建
doc$
观察到的)来获取我们将使用来匹配的onAdd$
值之一 - 如果从
onAddBuffer$
观察到的一个值的doc$
值相匹配的模式,不排放 - 如果没有值从
onAddBuffer$
观察到的doc$
值匹配,或者如果onAddBuffer$
观察到从来没有发出,发出doc$
值
这是我最好的猜测:
// for starters, concatMap doesn't seem right -- I want a whole new stream const docsToRemove$ = onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue }))
const onAddBuffer$ = onAdd$
.buffer(doc$) // capture events while fetching from db -- not sure about this
.takeUntil(Rx.Observable.timer(1000));
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap(doc =>
Rx.Observable.race(
onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()),
Rx.Observable.timer(1000).mapTo(doc)
)
);
});
docsToRemove$.subscribe(doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
这总是会发出EmptyObservable
。也许这是因为single
在没有匹配时似乎排出undefined
,并且我预计它在没有匹配时根本不会发出? find
发生同样的事情。
如果我将single
更改为filter
,则什么都不发出。
FYI:这是文件系统事件的重命名方案 - 如果add
事件的unlink
事件的1秒钟之内,接着给发出文件哈希匹配,什么也不做,因为它是一个rename
。否则,它是一个真正的unlink
,它应该发出要删除的数据库文档。
回答:
这是我的猜测,你怎么可以这样做:
onUnlink$.concatMap(unlinkValue => { const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
const onAddBuffer$ = onAdd$.buffer(bufferDuration$);
return Observable.forkJoin(onAddBuffer$, doc$)
.map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});
的single()
运营商是有点棘手,因为它发出的源可观察完成仅后的谓语功能相匹配的项目(或发出有两个项目或没有匹配项目时出错)。
race()
也很棘手。如果其中一个源Observable完成并且没有发出任何值race()
将刚刚完成并且不会发出任何东西。我前一段时间曾经报道过,这是正确的行为,请参阅https://github.com/ReactiveX/rxjs/issues/2641。
我想这是你的代码出了什么问题。
另请注意,.mapTo(Rx.Observable.empty())
会将每个值映射到Observable的实例。如果您想忽略所有值,则可以使用filter(() => false)
或ignoreElements()
运算符。
以上是 RxJS:如何将多个嵌套的观测数据与缓冲区结合使用 的全部内容, 来源链接: utcz.com/qa/264271.html