首页 > 解决方案 > 需要为数组的每个元素执行异步函数,但仅调度带有进度的操作

问题描述

我是 RxJS 的新手,仍在尝试弄清楚如何使用它来实现不同的功能。我需要有关实现可观察的帮助,尝试了很多方法,但似乎都没有奏效。

我有这个功能:

export function automateParameterEdit(tunId) {
  const progress$ = new Subject();

  const process$ = defer(async () => {
    const tun = await updateStatus(tunId, 'autoTun');
    progress$.next({ ...tun , progress: '0' });
    return { rules: tun.rules, tun };
  }).pipe(
    flatMap(({ rules, tun }) =>
      from(Object.values(rules)).pipe(
        concatMap(rule => autoEditParameters(tunId, rule.ruleId, tun.rulesetId)),
        scan((acc, curr) => acc + 1, 0),
        map(progress => {
          progress$.next({ ...tun, progress: progress / Object.values(rules).length * 100 });
        }),
        catchError(e => {
          // whatever
        }),
        finalize(async () => {
          // whatever
        })
      )
    )
  );

  return merge(progress$, process$);
}

所以,现在,动作被分派了两次,一次是因为progress$.next({ ...tun, progress: progress / Object.values(rules).length * 100 });发出新的 tun 进度,第二次我相信是因为执行:concatMap(rule => autoEditParameters(tunId, rule.ruleId, tun.rulesetId))

假设有 4 条规则 ( Object.values(rules).length === 4)。在控制台中,我看到调度了 4 x 2 = 8 个动作,其中一半的有效负载无效。

我想要做的是执行autoEditParameters(tunId, rule.ruleId, tun.rulesetId)which btw 是异步的,并且在每次执行后我想发出进度(progress$.next({ ...tun, progress: progress / Object.values(rules).length * 100 });)。

如何停止调度无效操作并仅执行异步autoEditParameters和调度进度?

标签: javascriptrxjsobservablereactive-programmingredux-observable

解决方案


你不需要一个Subject

当您需要“手动”通过流推送值时,您只需要一个主题。但是,在您的情况下,您只想将 ( map) 排放修改为不同的形状。

所以,你可以摆脱这个主题。无需process$progress$;合并 你可以简单地返回progress$

function automateParameterEdit(tunId) {
  const process$ = defer(async () => {
    const tun = await updateStatus(tunId, 'autoTun');
    return { rules: tun.rules, tun };
  }).pipe(
    flatMap(({ rules, tun }) =>
      from(Object.values(rules)).pipe(
        concatMap(rule => autoEditParameters(tunId, rule.ruleId, tun.rulesetId)),
        scan((acc, curr) => acc + 1, 0),
        map(progress => {
          return { ...tun, progress: progress / Object.values(rules).length * 100 };
        })
      )
    )
  );

  return process$;
}

以下是几个 StackBlitz 示例:

每次执行后我想发出进度

不确定您是否只是想发出数字百分比(而不是对象),但这很容易做到。有时将其分解为更小的函数可以更容易理解:

function automateParameterEdit(tunId): Observable<number> {
  return updateTun(tunId).pipe(
    flatMap(processRules)
  );
}

function updateTun(tunId): Observable<Tun> {
  return defer(async () => updateStatus(tunId, 'autoTun'))
}

function processRules(tun: Tun): Observable<number> {
  return from(tun.rules).pipe(
    concatMap(rule => autoEditParameters(tun.id, rule.ruleId, tun.rulesetId)),
    scan(acc => acc + 1, 0),
    map(doneCount => doneCount / tun.rules.length * 100),
    startWith(0),
  )
}

在这里,updateTun()只是包装了 async 函数并返回一个 observable,所以它会在订阅时执行。

processRules()接受 aTun并返回一个Observable<number>进度百分比。startWith只是发出一个初始值0.

StackBlitz


推荐阅读