我正在寻找类似于exhaustMapfrom 的运算符的东西rxjs,但RX.NET似乎没有这样的运算符。


我不想要的是在每个元素上启动一个异步处理程序 - 当处理程序运行时,我想删除源元素。

我还怀疑我需要在这里巧妙地使用 defer 运算符?


标签: c#.netsystem.reactiverx.net


这是ExhaustMap运算符的实现。源 observable 被投影到IObservable<Task<TResult>>,其中每个后续任务要么是前一个任务(如果它仍在运行),要么是与当前项目关联的新任务。然后用操作符删除重复出现的相同任务DistinctUntilChanged,最后用操作符将 observable 展平Concat

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TResult>> function)
    return source
        .Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
            return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));

    async Task<TResult> HideIdentity(Task<TResult> task) => await task;



    .Select(x => (int)x + 1)
    .Do(x => Console.WriteLine($"Input: {x}"))
    .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
    .Do(x => Console.WriteLine($"Result: {x}"))


Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9

更新:这是一个替代实现,其中function产生 anIObservable<TResult>而不是 a Task<TResult>

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> function)
    return Observable.Defer(() =>
        int mutex = 0; // 0: not acquired, 1: acquired
        return source.SelectMany(item =>
            // Attempt to acquire the mutex immediately. If successful, return
            // a sequence that releases the mutex when terminated. Otherwise,
            // return immediately an empty sequence.
            if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
                return function(item).Finally(() => Volatile.Write(ref mutex, 0));
            return Observable.Empty<TResult>();
