reactive-programming - 反应式中有 ThrottleWhile 或 SubscribeAsync 之类的东西吗?
问题描述
我有一个异步订阅逻辑,当它运行时,我不希望它再次运行,直到该逻辑完成。但是如果在逻辑运行时观察到任何事件,它将被限制直到订阅逻辑完成,然后立即再次触发
所以我希望我能写
observable.SubscribeAsync(async(data) => await DoSomething(data))`
或者更准确地说。我想限制输入数据,而不是跳过或等待,而是限制它。所以我认为如果我们有 ThrottleWhile 可能会更好(也许还有 DebounceWhile 或 ThrottleUntil)
bool running = false;
observable.ThrottleWhile((_) => running).Subscribe((data) => {
try
{
running = true;
await DoSomething(data);
}
finally
{
running = false;
}
})`
这已经可能了吗?我怎样才能以反应模式制造这个流程?
解决方案
我没有跟上图书馆的步伐,因此可能添加了一个内置运算符,但我不知道我使用的版本中有一个。这是您所要求的基本版本:
static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext,
Action<Exception> onError, Action onComplete)
{
//argument error checking omitted for brevity
T current = default(T);
bool processing = false;
bool haveCurrent = false;
return source
.Where((v) =>
{
if (processing)
{
current = v;
haveCurrent = true;
}
return !processing;
})
.Subscribe((v) =>
{
Action<Task> runNext = null;
runNext = (task) =>
{
if (haveCurrent)
{
haveCurrent = false;
onNext(current).ContinueWith(runNext);
}
else
{
processing = false;
}
};
processing = true;
onNext(v).ContinueWith(runNext);
},
onError,
onComplete);
}
此实现需要考虑的一些注意事项:
如果最后一项和完成发生在前一项正在处理时,则最后一次调用 onNext 可以在调用 onComplete 之后发生。同样,在 onNext 任务运行时也可以调用 onComplete。如果您的源永远不会完成,这可能不是问题。如果这是一个问题,您必须决定是要延迟对 onComplete 的调用,直到处理完最后一项,还是取消处理最后一项并进行相应调整。
我没有尝试测试在处理运行完成时项目正确进入时可能会或可能不会发生的任何竞争条件。
推荐阅读
- javascript - 是否可以获取或存储以前的重定向 url?
- javascript - Vanilla JS Web 组件包括和
- git - `git pull --rebase --autostash` 和 `git pull --ff-only` 之间的区别?
- postgresql - 如果我需要使用一些可能出现在停用词中的单词,如何更正填写同义词词典
- select - 如果我不使用完整密钥,单记录缓冲是否有效?
- asp.net-core - 使用身份验证在 Blazor 服务器端下载文件
- mysql - 将表两列连接为 1 列
- php - Laravel Eloquent 与不同外键的关系
- angular - 当 selectedIndex 异步更改时,mat-dialog 上的 Mat-tabs 不起作用
- c# - 创建 visio 文件可以从控制台应用程序工作,但不能从 Windows 服务工作