首页 > 解决方案 > Empty 不能在 rxjs 中完成 observable

问题描述

import { of, timer, EMPTY } from "rxjs"
import { concatMap, takeUntil, tap, switchMapTo } from "rxjs/operators"


of(1,2,3,4,5).pipe(
concatMap(val => {
    return timer(0, 1000).pipe(
        takeUntil(timer(3000)),
        switchMapTo(EMPTY)
    )
})
).subscribe()

此代码需要 15 秒才能运行。我想当返回 EMPTY 时,它会将可观察的计时器标记为完成并停止发射。并且那个 concatMap 应该立即接受下一个值。所以我的期望是它应该需要 0 秒。

编辑:

我的具体示例与轮询请求有关,如果我得到一个不好的响应就退出。本质上覆盖了指定的 takeUntil。

of([header1, header2, ...]).pipe(
concatMap(header => {
    const response1 = axios.get(...use header here)
    return timer(0, 1000).pipe(
        takeUntil(timer(3000)),
        switchMapTo(of(makeRequest(...based on response1))),
        // here i want to filter the values that the second request 
        // gets and make sure they are OK.
        // if they are not okay i want to get a new header, which 
        // leads 
        // to new response1 which leads to new makeRequests
        // so i want to essentially bail out before takeUntil if i get 
        //a bad response.
    )
}),
).subscribe(). 

编辑 2

我使用 takeUntil 和 takeWhile 解决了它,被触发的将覆盖另一个!

标签: javascriptrxjs

解决方案


这两个代码片段大致相同。

of(1,2,3,4,5).pipe(
  concatMap(_ =>
    timer(0, 1000).pipe(
      takeUntil(timer(3000)),
      switchMapTo(EMPTY)
    )
  )
).subscribe()
of(1,2,3,4,5).pipe(
  concatMap(_ =>
    timer(0, 1000).pipe(
      takeUntil(timer(3000)),
      filter(_ => false)
    )
  )
).subscribe()

switchMapTo(EMPTY):任何到达管道这一点的值都将被忽略并变成立即完成的流。

在 a 内时switchMap,完成不会导致switchMap完成。它只会等待另一个值来映射和一个新的流来订阅。

你想达到什么目的?


更新:关于timer

大多数情况下,这些将是相同的:

timer(0, 1000).pipe(
  takeUntil(timer(2100))
).subscribe(console.log);

timer(0, 1000).pipe(
  takeUntil(timer(3000))
).subscribe(console.log);

timer(3000)的第一次发射和timer(1,1000)的第四次发射在技术上落在同一毫秒。因为 JavaScript 不能保证这些延迟的管理精度,所以一些设置将采用第 4 个值,而一些(大多数)则不会。

甚至可能发生的事情取决于事件循环当前的繁忙程度。

如果您的实际代码使用该设置,我建议您take(number)改用。

timer(0, 1000).pipe(
  take(3)
).subscribe(console.log);

推荐阅读