RxJS:如何将多个嵌套的观测数据与缓冲区结合使用

警告:RxJS newb在这里。RxJS:如何将多个嵌套的观测数据与缓冲区结合使用

这里是我的挑战:

  1. onUnlink$观察到发射...
  2. 立即开始从onAdd$观察到的捕获值,最多1秒(我会打电话给此分区onAddBuffer$) 。
  3. 查询数据库(创建doc$观察到的)来获取我们将使用来匹配的onAdd$值之一
  4. 如果从onAddBuffer$观察到的一个值的doc$值相匹配的模式,不排放
  5. 如果没有值从onAddBuffer$观察到的doc$值匹配,或者如果onAddBuffer$观察到从来没有发出,发出doc$

这是我最好的猜测:

// for starters, concatMap doesn't seem right -- I want a whole new stream 

const docsToRemove$ = onUnlink$.concatMap(unlinkValue => {

const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue }))

const onAddBuffer$ = onAdd$

.buffer(doc$) // capture events while fetching from db -- not sure about this

.takeUntil(Rx.Observable.timer(1000));

// if there is a match, emit nothing. otherwise wait 1 second and emit doc

return doc$.switchMap(doc =>

Rx.Observable.race(

onAddBuffer$.single(added => doc.attr === added.attr).mapTo(Rx.Observable.empty()),

Rx.Observable.timer(1000).mapTo(doc)

)

);

});

docsToRemove$.subscribe(doc => {

// should only ever be invoked (with doc -- the doc$ value) 1 second

// after `onUnlink$` emits, when there are no matching `onAdd$`

// values within that 1 second window.

})

这总是会发出EmptyObservable。也许这是因为single在没有匹配时似乎排出undefined,并且我预计它在没有匹配时根本不会发出? find发生同样的事情。

如果我将single更改为filter,则什么都不发出。

FYI:这是文件系统事件的重命名方案 - 如果add事件的unlink事件的1秒钟之内,接着给发出文件哈希匹配,什么也不做,因为它是一个rename。否则,它是一个真正的unlink,它应该发出要删除的数据库文档。

回答:

这是我的猜测,你怎么可以这样做:

onUnlink$.concatMap(unlinkValue => { 

const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();

const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);

const onAddBuffer$ = onAdd$.buffer(bufferDuration$);

return Observable.forkJoin(onAddBuffer$, doc$)

.map(([buffer, docResponse]) => { /* whatever logic you need here */ });

});

single()运营商是有点棘手,因为它发出的源可观察完成仅后的谓语功能相匹配的项目(或发出有两个项目或没有匹配项目时出错)。

race()也很棘手。如果其中一个源Observable完成并且没有发出任何值race()将刚刚完成并且不会发出任何东西。我前一段时间曾经报道过,这是正确的行为,请参阅https://github.com/ReactiveX/rxjs/issues/2641。
我想这是你的代码出了什么问题。

另请注意,.mapTo(Rx.Observable.empty())会将每个值映射到Observable的实例。如果您想忽略所有值,则可以使用filter(() => false)ignoreElements()运算符。

以上是 RxJS:如何将多个嵌套的观测数据与缓冲区结合使用 的全部内容, 来源链接: utcz.com/qa/264271.html

回到顶部