首页 > 解决方案 > 尽快订阅第二个 observable,但等待第一个发出 - Angular / RxJS

问题描述

我的情况是,我想尽快订阅两个不同的 observable,但在第二个发出之前等待第一个发出。

例如,我正在开发一个聊天应用程序,我想检索存储在数据库中的所有以前的消息,但也订阅一个发出新消息的流。

我想要这样的东西:

readonly messages$ = this.service.getStoredMessages().pipe(
    switchMapTo(this.service.getNewMessages())
)

这样做的问题是我可能会因为延迟订阅而错过一些消息getNewMessages()。我需要做的是尽快订阅这两个 observables,但第二个 ( getNewMessages()) 应该只在发出后才getStoredMessages()发出。

标签: angularrxjs

解决方案


假如说:

  • storedMessges$类型是Observable<Message[]>
  • 那种getMessages$类型是Observable<Message>
  • 这种messages$类型应该是Observable<Message>

我认为你是你想要做的事情或多或少是这样的:

readonly messages$ = defer(() => {
  const storedMessages$ = this.service.getStoredMessages().pipe(share())
  const newMessages$ = this.service.getNewMessages().pipe(share())

  const missedMessages$ = newMessages$.pipe(
    takeUntil(storedMessages$),
    toArray(),
  )

  const initialMessages$ = merge(
    storedMessages$,
    missedMessages$
  ).pipe(mergeAll()) // instead of `mergeAll` maybe you want to `scan` and make sure that there are no duplicate values...

  return merge(
    newMessages$.pipe(ignoreElements()), // it ensures that we don't unbscribe from newMessages$ after storedmessages$ emits/completes.
    concat(initialMessages$, newMessages$),
  )
})

推荐阅读