首页 > 解决方案 > Rxjs 实现实时邮件

问题描述

我的代码非常简单:
在页面加载时,我加载了用户的消息,然后我想合并所有新的传入消息。

   this.me$ = this.store
        .pipe(map((state: AppState) => state.auth));

    const allMessages$ = this.me$
        .pipe(mergeMap((me: User) => this.messageService.getMessage$(me.username)));

    const allNewMessages$: Observable<Message[]> = this.me$
        .pipe(mergeMap((me: User) => this.messageService.newMessage$(me.username)))
        .pipe(scan((messages: Message[], newMessage: Message) => messages.concat(newMessage), []))
        .pipe(startWith([]))

    this.messages$ = combineLatest([allMessages$, allNewMessages$])
        .pipe(map(([allMessages, allNewMessages]) => allMessages.concat(allNewMessages)))
        .pipe(map((messages: Message[]) => [...messages].sort((d1, d2) => new Date(d1.date) < new Date(d2.date) ? -1 : 1)));

问题是当新消息发射时, allNewMessages$ 可观察到的发射如下:

events[m1, m2, m3] -> emitted[[m1], [m1, m1, m2], [m1, m1, m2, m1, m1, m2, m3]]

我想要类似的东西:

events[m1, m2, m3] -> emitted[[m1], [m1, m2], [m1, m2, m3]]

谢谢你

标签: javascripttypescriptrxjsreactive-programming

解决方案


我终于找到了解决方案。我必须以这种方式替换我mergeMapswitchMapHttp observable 并避免多次混乱调用。


推荐阅读