首页 > 解决方案 > 反应式中有 ThrottleWhile 或 SubscribeAsync 之类的东西吗?

问题描述

我有一个异步订阅逻辑,当它运行时,我不希望它再次运行,直到该逻辑完成。但是如果在逻辑运行时观察到任何事件,它将被限制直到订阅逻辑完成,然后立即再次触发

所以我希望我能写

observable.SubscribeAsync(async(data) => await DoSomething(data))`

或者更准确地说。我想限制输入数据,而不是跳过或等待,而是限制它。所以我认为如果我们有 ThrottleWhile 可能会更好(也许还有 DebounceWhile 或 ThrottleUntil)

bool running = false;
observable.ThrottleWhile((_) => running).Subscribe((data) => {
    try
    {
        running = true;
        await DoSomething(data);
    }
    finally
    {
        running = false;
    }
})`

这已经可能了吗?我怎样才能以反应模式制造这个流程?

标签: reactive-programmingsystem.reactiveunirx

解决方案


我没有跟上图书馆的步伐,因此可能添加了一个内置运算符,但我不知道我使用的版本中有一个。这是您所要求的基本版本:

static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext,
                                     Action<Exception> onError, Action onComplete)
{
    //argument error checking omitted for brevity
    T current = default(T);
    bool processing = false;
    bool haveCurrent = false;

    return source
           .Where((v) =>
                  {
                      if (processing)
                      {
                          current = v;
                          haveCurrent = true;
                      }
                      return !processing;
                  })
           .Subscribe((v) =>
                      {
                          Action<Task> runNext = null;
                          runNext = (task) =>
                              {
                                  if (haveCurrent)
                                  {
                                      haveCurrent = false;
                                      onNext(current).ContinueWith(runNext);
                                  }
                                  else
                                  {
                                      processing = false;
                                  }
                              };
                          processing = true;
                          onNext(v).ContinueWith(runNext);
                      },
                      onError,
                      onComplete);
}

此实现需要考虑的一些注意事项:

如果最后一项和完成发生在前一项正在处理时,则最后一次调用 onNext 可以在调用 onComplete 之后发生。同样,在 onNext 任务运行时也可以调用 onComplete。如果您的源永远不会完成,这可能不是问题。如果这是一个问题,您必须决定是要延迟对 onComplete 的调用,直到处理完最后一项,还是取消处理最后一项并进行相应调整。

我没有尝试测试在处理运行完成时项目正确进入时可能会或可能不会发生的任何竞争条件。


推荐阅读