c# - While the OnNext is running, I want to drop all incoming notifications except from the latest
问题描述
I have some observable, that produces items over time. Intervals between items is almost random. I subscribe to my observable and process item, each process can take a lot of time and when OnNext
ends its work, many new items are produced. I need to take only last item and process it. So every time long OnNext
operation completes I need do OnNext
for latest item from items that was produced during previous OnNext
run.
Can I do it with Rx?
I tried Window(1).Switch()
but seems like it executes immediately when item comes, not when OnNext
completed.
解决方案
这是相对简单的自定义DroppingDo
运算符,它可能满足您的需求。它与内置Do
操作符有些相似,不同之处在于它在而不是当前线程上调用操作ThreadPool
,并且它忽略在前一个操作运行时接收到的项目。不过,最新的项目被保留了。
/// <summary>
/// Invokes an action sequentially for each element in the observable sequence,
/// on the specified scheduler, skipping and dropping elements that are received
/// during the execution of a previous action, except from the latest element.
/// </summary>
public static IObservable<TSource> DroppingDo<TSource>(
this IObservable<TSource> source,
Action<TSource> action,
IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
Tuple<TSource> latest = null;
return source
.Select(item =>
{
var previous = Interlocked.Exchange(ref latest, Tuple.Create(item));
if (previous != null) return Observable.Empty<TSource>();
return Observable.Defer(() =>
{
var current = Interlocked.Exchange(ref latest, null);
Debug.Assert(current != null);
var unboxed = current.Item1;
return Observable.Start(
() => { action(unboxed); return unboxed; }, scheduler);
});
})
.Concat();
});
}
用法示例。只需替换可能如下所示的代码:
someObservable
.Subscribe(x => Process(x), ex => HandleError(ex));
有了这个:
someObservable
.DroppingDo(x => Process(x))
.Subscribe(_ => { }, ex => HandleError(ex));
推荐阅读
- javascript - 在事件中传递 JSON 文件:事件无法读取键,只能读取值
- ios - GRPC IOS 客户端连接丢失 - 目标 C
- python - 使用 QFileSystemModel 展开 QTreeView 中的项目
- javascript - 动态创建的下拉菜单未打开
- microsoft-graph-api - 在“oneDrive.item”类型上获取“属性 'openWith' 不是导航属性。” 尝试访问工作簿 api 时
- windows - 从不同的 powershell 会话访问在一个 powershell 会话中创建的远程 powershell 会话
- python-3.x - 我想拆分数据并按行和列获取值
- firebase - Firestore 超时
- javascript - 大量的 mongodb 动态集合对项目来说是一件坏事,并且对性能有影响吗
- c - 当应用于链表中的指针时,AND 运算符是什么意思?