首页 > 解决方案 > 是否可以在每个可观察到的完成上发出值,这些完成将使用单管道发送给订阅者?

问题描述

我需要按声明的顺序调用多个 Web 请求。每个连续请求都取决于前一个请求的结果。我想在每个请求完成后发出将发送给订阅者的事件。

现在我正在“挖掘”每个请求的值并使用单独Subject的 . 是否可以使用带有操作员的单管来做到这一点?

这是代码示例

fromEvent(pauseButton, "click")
  .pipe(
    tap(()=>{
      subscribedLabel.innerHTML="";
      tapedLabel.innerHTML="";
    }),
    tap(v => (tapedLabel.innerHTML = "started")),
    concatMapTo(of("phase 1 completed").pipe(delay(1000))),
    tap(v => (tapedLabel.innerHTML = v)),
    concatMapTo(of("phase 2 completed").pipe(delay(1000))),
    tap(v => (tapedLabel.innerHTML = v))
    )
  .subscribe(v => {
    console.log(v);
    subscribedLabel.innerHTML = v;
  });

https://stackblitz.com/edit/typescript-6jza7h?file=index.ts

预期的结果是subscribedLabel.innerHTML会以同样的方式改变tapedLabel.innerHTML

标签: rxjs

解决方案


目前尚不清楚您在追求什么,但这是一种您可以使用 4 个连续调用并将所有响应累积到一个对象中的方法。

function fakeHTTP(resW): Observable<string> {
  return of(resW).pipe(delay(1000))
}

fromEvent(button, "click").pipe(
  concatMap(_ => 
    fakeHTTP(1).pipe(
      map(res => ({first: res}))
    )
  ),
  tap(_ => console.log("First Request Complete")),
  concatMap(first => 
    fakeHTTP(2).pipe(
      map(res => ({...first, second: res}))
    )
  ),
  tap(_ => console.log("Second Request Complete")),
  concatMap(second => 
    fakeHTTP(3).pipe(
      map(res => ({...second, third: res}))
    )
  ),
  tap(_ => console.log("Third Request Complete")),
  concatMap(third => 
    fakeHTTP(4).pipe(
      map(res => ({...third, fourth: res}))
    )
  ),
  tap(_ => console.log("Fourth Request Complete"))
).subscribe(console.log);

其输出如下:

// Wait 1s
First Request Complete
// Wait 1s
Second Request Complete
// Wait 1s
Third Request Complete
// Wait 1s
Fourth Request Complete
{"first":1,"second":2,"third":3,"fourth":4} // <- Value sent to subscribe

更新 #1:将值向上传递到调用链

您可以将值向上传递调用链,但它会变得有点复杂。您希望每个步骤仅处理上一步中的值,但忽略(发出未更改的)来自更上链的值。

您可以这样做的一种方法是标记每个响应。我用一个pass可以是真或假的标志来做到这一点。最后的操作是删除标志。

看起来是这样的:

function fakeHTTP(resW): Observable<string> {
  return of(resW).pipe(delay(1000))
}

fromEvent(button, "click").pipe(
  concatMap(_ => 
    fakeHTTP(1)
  ),
  tap(_ => console.log("First Request Complete")),
  concatMap(first => 
    fakeHTTP(2).pipe(
      map(res => ({pass: false, payload: res})),
      startWith({pass: true, payload: first})
    )
  ),
  tap(({pass}) => {
    if(!pass) console.log("Second Request Complete")
  }),
  concatMap(second => second.pass ? 
    of(second) :
    fakeHTTP(3).pipe(
      map(res => ({pass: false, payload: res})),
      startWith({...second, pass: true})
    )
  ),
  tap(({pass}) => {
    if(!pass) console.log("Third Request Complete")
  }),
  concatMap(third => third.pass ?
    of(third) :
    fakeHTTP(4).pipe(
      map(res => ({pass: false, payload: res})),
      startWith({...third, pass: true})
    )
  ),
  tap(({pass}) => {
    if(!pass) console.log("Second Request Complete")
  }),
  map(({payload}) => payload)
).subscribe(console.log);

其输出如下:

// Wait 1s
First Request Complete // <- console log from tap
1 // <- console log from subscribe
// Wait 1s
Second Request Complete // <- console log from tap
2 // <- console log from subscribe
// Wait 1s
Third Request Complete // <- console log from tap
3 // <- console log from subscribe
// Wait 1s
Second Request Complete // <- console log from tap
4 // <- console log from subscribe

更新#2:当递归是可能的

您还可以进行递归调用,其中每个新调用都依赖于前一个调用,并且某些基本情况会结束递归。RxJS jasexpand作为一种内置的递归方式。

在此示例中,每个新调用都直接fakeHTTP使用前一次调用发出的值。

function fakeHTTP(resW): Observable<string> {
  return of(resW).pipe(delay(1000))
}

fromEvent(button, "click").pipe(
  map(_ => 1),
  expand(proj => proj < 4 ?
    fakeHTTP(++proj) :
    EMPTY
  )
).subscribe(console.log);

其输出如下:

// Wait 1s
1
// Wait 1s
2
// Wait 1s
3
// Wait 1s
4

更新 #3:单独的 observables

function fakeHTTP(resW): Observable<string> {
  return of(resW).pipe(delay(1000))
}

const first$ = fromEvent(button, "click").pipe(
  concatMap(_ => fakeHTTP(1)),
  share()
);
const second$ = first$.pipe(
  concatMap(first => fakeHTTP(2)),
  share()
);
const third$ = second$.pipe(
  concatMap(second => fakeHTTP(3)),
  share()
);
const fourth$ = third$.pipe(
  concatMap(third => fakeHTTP(4))
);

merge(
  first$, 
  second$, 
  third$, 
  fourth$
).subscribe(console.log);

这是另一种更烦人的方式来编写几乎完全相同的内容。

function fakeHTTP(resW): Observable<string> {
  return of(resW).pipe(delay(1000))
}

fromEvent(button, "click").pipe(
  map(_ => fakeHTTP(1).pipe(
    share(),
  )),
  map(first$ => ([first$.pipe(
    concatMap(firstR => fakeHTTP(2)),
    share()
  ), first$])),
  map(([second$, ...tail]) => ([second$.pipe(
    concatMap(secondR => fakeHTTP(3)),
    share()
  ),second$, ...tail])),
  map(([third$, ...tail]) => ([third$.pipe(
    concatMap(thirdR => fakeHTTP(4))
  ),third$, ...tail])),
  concatMap(calls => merge(...calls))
).subscribe(console.log);

推荐阅读