首页 > 解决方案 > 延迟后重新执行异步 RxJS 流

问题描述

我正在使用 RxJS 6 使用类似于下面运行的示例的代码来懒惰地逐步遍历可迭代对象。这运作良好,但我无法解决我的最终用例。

完整代码在这里

import { EMPTY, defer, from, of } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";

function stepIterator (iterator) {
  return defer(() => of(iterator.next())).pipe(
    mergeMap(result => result.done ? EMPTY : of(result.value))
  );
}

function iterateValues ({ params }) {
  const { values, delay: delayMilliseconds } = params;
  const isIterable = typeof values[Symbol.iterator] === "function";

  // Iterable values which are emitted over time are handled manually. Otherwise
  // the values are provided to Rx for resolution.
  if (isIterable && delayMilliseconds > 0) {
    const iterator = values[Symbol.iterator]();

    // The first value is emitted immediately, the rest are emitted after time.
    return stepIterator(iterator).pipe(
      expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
    );
  } else {
    return from(values);
  }
}

const options = { 
  params: {
    // Any iterable object is walked manually. Otherwise delegate to `from()`.
    values: ["Mary", "had", "a", "little", "lamb"],
    // Delay _between_ values.
    delay: 350,
    // Delay before the stream restarts _after the last value_.
    runAgainAfter: 1000,
  }
};

iterateValues(options)
  // Is not repeating?!
  .pipe(repeat(3))
  .subscribe(
    v => {
      console.log(v, Date.now());
    },
    console.error,
    () => {
      console.log('Complete');
    }
  );

我想添加另一个选项,它将在延迟(runAgainAfter)后无限次重新执行流。result.done在没有深入考虑案例的情况下,我无法清晰地撰写此内容。到目前为止,我一直无法围绕iterateValues.

完成用例的最佳方法是什么?

谢谢!

编辑1:repeat只是打我的脸。也许这意味着友好。编辑2:不,重复不是重复,但可观察的正在完成。谢谢你的帮助。我很困惑。


对于后代,这里是修订版的完整代码示例 -repeat能够并在项目之间使用一致的延迟。

import { concat, EMPTY, defer, from, interval, of, throwError } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";

function stepIterator(iterator) {
  return defer(() => of(iterator.next())).pipe(
    mergeMap(result => (result.done ? EMPTY : of(result.value)))
  );
}

function iterateValues({ params }) {
  const { values, delay: delayMilliseconds, times = 1 } = params;
  const isIterable =
    values != null && typeof values[Symbol.iterator] === "function";

  if (!isIterable) {
    return throwError(new Error(`\`${values}\` is not iterable`));
  }

  // Iterable values which are emitted over time are handled manually. Otherwise
  // the values are provided to Rx for resolution.
  const observable =
    delayMilliseconds > 0
      ? defer(() => of(values[Symbol.iterator]())).pipe(
          mergeMap(iterator =>
            stepIterator(iterator).pipe(
              expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
            )
          )
        )
      : from(values);

  return observable.pipe(repeat(times));
}

标签: rxjs

解决方案


我会说实话,但肯定会有更好的解决方案。在我的解决方案中,我最终将延迟逻辑封装在自定义runAgainAfter运算符中。使其成为独立的部分,不会直接影响您的代码逻辑。

完整的工作代码在这里

runAgainAfter如果有人需要它的代码:

import { Observable } from "rxjs";

export const runAgainAfter = delay => observable => {
  return new Observable(observer => {
    let timeout;
    let subscription;
    const subscribe = () => {
      return observable.subscribe({
        next(value) {
          observer.next(value);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          timeout = setTimeout(() => {
            subscription = subscribe();
          }, delay);
        }
      });
    };
    subscription = subscribe();

    return () => {
      subscription.unsubscribe();
      clearTimeout(timeout);
    };
  });
};

希望对你有帮助 <3


推荐阅读