c# - 如何在 Rx.Net 中实现排气映射处理程序?
问题描述
我正在寻找类似于exhaustMap
from 的运算符的东西rxjs
,但RX.NET
似乎没有这样的运算符。
我需要实现的是,在源流的每个元素上,我需要启动一个async
处理程序,直到它完成,我想从源中删除任何元素。处理程序完成后,继续获取元素。
我不想要的是在每个元素上启动一个异步处理程序 - 当处理程序运行时,我想删除源元素。
我还怀疑我需要在这里巧妙地使用 defer 运算符?
谢谢!
解决方案
这是ExhaustMap
运算符的实现。源 observable 被投影到IObservable<Task<TResult>>
,其中每个后续任务要么是前一个任务(如果它仍在运行),要么是与当前项目关联的新任务。然后用操作符删除重复出现的相同任务DistinctUntilChanged
,最后用操作符将 observable 展平Concat
。
/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
{
return source
.Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
{
return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
})
.DistinctUntilChanged()
.Concat();
async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}
function
不能保证返回的任务是不同的,因此需要HideIdentity
返回任务的不同包装器的本地函数。
使用示例:
Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => (int)x + 1)
.Take(10)
.Do(x => Console.WriteLine($"Input: {x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"Result: {x}"))
.Wait();
输出:
Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9
更新:这是一个替代实现,其中function
产生 anIObservable<TResult>
而不是 a Task<TResult>
:
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> function)
{
return Observable.Defer(() =>
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(item =>
{
// Attempt to acquire the mutex immediately. If successful, return
// a sequence that releases the mutex when terminated. Otherwise,
// return immediately an empty sequence.
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return function(item).Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<TResult>();
});
});
}
推荐阅读
- arrays - 如何取 2 列,一列用于名称,一列用于值,并吐出每个唯一名称的唯一数量?然后列个清单
- javascript - Highcharts 股票图表未显示自定义数据
- visual-studio-2019 - 如何在 Visual Studio 2019 中更改分析文件输出位置?
- python - 连接的位于基本路径组件之外
- arrays - 将 c 中的值交换为反向数组,但数组/内存块保持不变
- tensorflow - Keras Tuner 和 keras 功能 API
- reactjs - 有没有办法从 tomcat 读取属性文件进行反应?
- flutter - Flutter FluentUI NavigationView 在 web 上崩溃“期望一个类型为‘SkDeletable’的值,但得到一个类型为‘Null’的值
- android - Admob 自适应横幅扩展
- php - 即使明确指定了另一个队列,Laravel 也将作业调度到默认队列