首页 > 解决方案 > 使用 RxJS 6,我如何从观察到的数组中重新发出值,每个/所有值之间有延迟?

问题描述

我想定期轮询一个 API,该 API 将返回一个记录数组,这些记录的大小几乎肯定会有所不同。我希望每条记录都显示在一个 CSS 动画中,这需要花费t时间。因此,我必须缓冲 API 响应并单独释放它们,频率不超过t,以便动画能够顺利完成。

经过大量搜索和反复试验,我将这个自定义 RxJS 运算符(在无框架 TypeScript 中)放在一起。然而,双重/嵌套concatMap有一点代码味道。有没有更优雅或更被动的解决方案?所有内部 observables 是否管理得当(取消订阅)?

(这是我的第一个自定义运算符,因此欢迎任何其他反馈。)

export function recordPace (/* params to pipe */): OperatorFunction<IRecord[], IRecord> {
    // inner function automatically receives source observable
    return (source: Observable<IRecord[]>) => {
        return source.pipe(
            // First, explode the array to show one at a time.
            concatMap((records: IRecord[]) => from(records).pipe(
                // Now, for each array value, add a delay
                concatMap((record: IRecord) => of(record).pipe(
                    delay(t),
                ))
                tap((record: IRecord) => {
                    // executes once per record, no faster than every t
                }),
            )),
            tap((record: IRecord) => {
                // alternative also executes once per record, no faster than every t
            }),
        );
    };
}
MyApi.doPolling().pipe(
    recordPace(),
    recordAnimate(),
).subscribe(
    () => {},
    () => {},
    () => { console.log('done'); }
);

标签: typescriptrxjsreactive-programming

解决方案


如果要爆炸数组,可以使用mergeAll().

export function recordPace (/* params to pipe */): OperatorFunction<IRecord[], IRecord> {
  return (source: Observable<IRecord[]>) => {
    return source.pipe(
      // concatMap((records: IRecord[]) => from(records).pipe(
      mergeAll(),
      concatMap((record: IRecord) => of(record).pipe(
        delay(t),
      ))
      tap((record: IRecord) => {
        // executes once per record, no faster than every t
      }),
      // )),
      tap((record: IRecord) => {}),
    );
  };
}

您可以使用任何mergeAll, switchAll,concatAll运算符。这是因为如果您只想爆炸数组,则不涉及异步操作。上面提到的每个运算符都接受一个返回一个函数的函数ObservableInput

export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;

正如你所看到的,一个数组可以被认为是一个ObservableInput,并且行为是如果你的回调函数返回一个数组,它将单独发送它的每个项目。

另外,祝贺您的第一个自定义运算符!


推荐阅读