首页 > 解决方案 > RXJS 链依赖的 observables 顺序,但获取进度条的每个发射

问题描述

我遇到了一个问题,我一直在尝试使用 找到解决方案RxJs,但似乎找不到适合它的解决方案...

这是我的想法:

考虑这个非常简化的版本。假设每个都of代表一个完整的 REST 成功请求(稍后将处理错误),并且我将使用n参数做未显示的工作......

const request1 = of('success 1').pipe(
  delay(500),
  tap(n => console.log('received ' + n)),
);

const request2 = (n) => of('success 2').pipe(
  delay(1000),
  tap(n => console.log('received ' + n))
);

const request3 = (n) => of('success 3').pipe(
  delay(400),
  tap(n => console.log('received ' + n))
);

request1.pipe(
  concatMap(n => request2(n).pipe(
    concatMap(n => request3(n))
  ))
)

但是,当我订阅最后一段代码时,我只会得到最后一个请求的响应,这是预期的,因为管道会解决这个问题。

因此concatMap(),我可以正确链接我的依赖 REST 调用,但无法跟踪进度。

虽然我可以通过嵌套订阅很容易地跟踪进度,但我正在努力避免这种情况并使用最佳实践方式。

我怎样才能链接我的依赖 REST 调用,但每次调用成功时仍然能够做一些事情?

标签: javascriptrxjspipeobservableconcatmap

解决方案


这是一个通用的解决方案,虽然不是那么简单。但它确实在避免share操作符的同时取得了可观察的进展,如果使用不当,可能会引入意外的状态。

const chainRequests = (firstRequestFn, ...otherRequestFns) => (
  initialParams
) => {
  return otherRequestFns.reduce(
    (chain, nextRequestFn) =>
      chain.pipe(op.concatMap((response) => nextRequestFn(response))),
    firstRequestFn(initialParams)
  );
};

chainRequests接受可变数量的函数并返回一个接受初始参数的函数并返回一个可观察到concatMaps的函数,如问题中手动显示的那样。它通过将每个函数简化为恰好是可观察的累积值来实现这一点。

请记住,如果我们知道路径,RxJS 会带领我们走出回调地狱。

const chainRequestsWithProgress = (...requestFns) => (initialParams) =>  {
  const progress$ = new Rx.BehaviorSubject(0);
  const wrappedFns = requestFns.map((fn, i) => (...args) =>
    fn(...args).pipe(op.tap(() => progress$.next((i + 1) / requestFns.length)))
  );
  const chain$ = Rx.defer(() => {
    progress$.next(0);
    return chainRequests(...wrappedFns)(initialParams);
  });
  return [chain$, progress$];
};

chainRequestsWithProgress返回两个 observables - 一个最终发出最后一个响应,一个在订阅第一个 observable 时发出进度值。我们通过创建一个BehaviorSubject作为我们的进度值流来做到这一点,并包装我们的每个请求函数以返回它们通常会返回的相同的可观察对象,但我们也将它通过管道传递给它,tap以便它可以将一个新的进度值推送到BehaviorSubject.

每次订阅第一个 observable 时,进度都会归零。

如果你想返回一个产生进度状态和最终结果值的 observable,你可以chainRequestsWithProgress改为返回:

chain$.pipe(
  op.startWith(null),
  op.combineLatest(progress$, (result, progress) => ({ result, progress }))
)

并且您将有一个可观察的对象,该对象会发出一个对象,该对象代表最终结果的进度,然后是该结果本身。值得深思 -progress$必须只发出数字吗?

警告

这假设 request observables 只发出一个值。


推荐阅读