如何使用RX限制事件流?

我想有效地限制事件流,以便在接收到第一个事件时调用我的委托,但是如果接收到后续事件,则不放一秒钟。该超时时间(1秒)到期后,如果接收到后续事件,则希望调用我的委托。

有使用Reactive Extensions做到这一点的简单方法吗?

样例代码:

static void Main(string[] args)

{

Console.WriteLine("Running...");

var generator = Observable

.GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)

.Timestamp();

var builder = new StringBuilder();

generator

.Sample(TimeSpan.FromSeconds(1))

.Finally(() => Console.WriteLine(builder.ToString()))

.Subscribe(feed =>

builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",

feed.Value,

feed.Timestamp.ToString("mm:ss.fff"),

DateTime.Now.ToString("mm:ss.fff"))));

Console.ReadKey();

}

电流输出:

Running...

Observed 064, generated at 41:43.602, observed at 41:43.602

Observed 100, generated at 41:44.165, observed at 41:44.602

但我想观察一下(时间戳显然会改变)

Running...

Observed 001, generated at 41:43.602, observed at 41:43.602

....

Observed 100, generated at 41:44.165, observed at 41:44.602

回答:

这是我在RX论坛上获得的一些帮助:

这个想法是发行一系列“票”以激发原始顺序。这些“票证”因超时而被延迟,但不包括第一个票证,该票证将立即添加到票证序列中。当事件进入且有票证等待时,该事件会立即触发,否则会等到票证再触发。当它触发时,发出下一张票,依此类推…

为了将票证和原始事件结合起来,我们需要一个组合器。不幸的是,此处无法使用“标准”

.CombineLatest,因为它会在以前使用的票证和事件上触发。因此,我必须创建自己的组合器,该组合器基本上是经过过滤的.CombineLatest,它仅在组合中的两个元素均为“新鲜”时才触发-

之前从未返回过。我称之为.CombineVeryLatest又名.BrokenZip;)

使用.CombineVeryLatest,可以将上述想法实现为:

    public static IObservable<T> SampleResponsive<T>(

this IObservable<T> source, TimeSpan delay)

{

return source.Publish(src =>

{

var fire = new Subject<T>();

var whenCanFire = fire

.Select(u => new Unit())

.Delay(delay)

.StartWith(new Unit());

var subscription = src

.CombineVeryLatest(whenCanFire, (x, flag) => x)

.Subscribe(fire);

return fire.Finally(subscription.Dispose);

});

}

public static IObservable<TResult> CombineVeryLatest

<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,

IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)

{

var ls = leftSource.Select(x => new Used<TLeft>(x));

var rs = rightSource.Select(x => new Used<TRight>(x));

var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });

var fltCmb = cmb

.Where(a => !(a.x.IsUsed || a.y.IsUsed))

.Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });

return fltCmb.Select(a => selector(a.x.Value, a.y.Value));

}

private class Used<T>

{

internal T Value { get; private set; }

internal bool IsUsed { get; set; }

internal Used(T value)

{

Value = value;

}

}

编辑:这是AndreasKöpf在论坛上提出的CombineVeryLatest的另一个更紧凑的变体:

public static IObservable<TResult> CombineVeryLatest

<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,

IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)

{

return Observable.Defer(() =>

{

int l = -1, r = -1;

return Observable.CombineLatest(

leftSource.Select(Tuple.Create<TLeft, int>),

rightSource.Select(Tuple.Create<TRight, int>),

(x, y) => new { x, y })

.Where(t => t.x.Item2 != l && t.y.Item2 != r)

.Do(t => { l = t.x.Item2; r = t.y.Item2; })

.Select(t => selector(t.x.Item1, t.y.Item1));

});

}

以上是 如何使用RX限制事件流? 的全部内容, 来源链接: utcz.com/qa/405674.html

回到顶部