首页 > 解决方案 > While the OnNext is running, I want to drop all incoming notifications except from the latest

问题描述

I have some observable, that produces items over time. Intervals between items is almost random. I subscribe to my observable and process item, each process can take a lot of time and when OnNext ends its work, many new items are produced. I need to take only last item and process it. So every time long OnNext operation completes I need do OnNext for latest item from items that was produced during previous OnNext run. Can I do it with Rx?

I tried Window(1).Switch() but seems like it executes immediately when item comes, not when OnNext completed.

标签: c#system.reactiverx.net

解决方案


这是相对简单的自定义DroppingDo运算符,它可能满足您的需求。它与内置Do操作符有些相似,不同之处在于它在而不是当前线程上调用操作ThreadPool,并且它忽略在前一个操作运行时接收到的项目。不过,最新的项目被保留了。

/// <summary>
/// Invokes an action sequentially for each element in the observable sequence,
/// on the specified scheduler, skipping and dropping elements that are received
/// during the execution of a previous action, except from the latest element.
/// </summary>
public static IObservable<TSource> DroppingDo<TSource>(
    this IObservable<TSource> source,
    Action<TSource> action,
    IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    {
        Tuple<TSource> latest = null;
        return source
            .Select(item =>
            {
                var previous = Interlocked.Exchange(ref latest, Tuple.Create(item));
                if (previous != null) return Observable.Empty<TSource>();
                return Observable.Defer(() =>
                {
                    var current = Interlocked.Exchange(ref latest, null);
                    Debug.Assert(current != null);
                    var unboxed = current.Item1;
                    return Observable.Start(
                        () => { action(unboxed); return unboxed; }, scheduler);
                });
            })
            .Concat();
    });
}

用法示例。只需替换可能如下所示的代码:

someObservable
    .Subscribe(x => Process(x), ex => HandleError(ex));

有了这个:

someObservable
    .DroppingDo(x => Process(x))
    .Subscribe(_ => { }, ex => HandleError(ex));

推荐阅读