首页 > 解决方案 > Angular NGRX RXJS - 订阅其他可观察的内部效果不起作用

问题描述

我有一个问题,我有一个管理 SSE 的服务,我编写了这个服务:

    getAllNotify(url: string): Observable<EventDTO> {
      return new Observable(observer => {
        const eventSource = this.getEventSource(url);

        eventSource.onopen = function () {
          console.log('connection for all data is established');
        };
        eventSource.onmessage = event => {
          this.zone.run(() => {
            if (eventSource.readyState !== 0) {
              observer.next(JSON.parse(event.data));
            }
          });
        };
        eventSource.onerror = error => {
          this.zone.run(() => {
            observer.error(error);
          });
        };
      });
    }

    private getEventSource(url: string): EventSource {
      return new EventSource(url);
    }

app-Component我调度一个动作以启动由下面的效果监听的 SSE 连接。

在这个效果中,我需要从服务器获取响应,然后我需要订阅我的 ngrx 商店中的其他 observable。

我正在尝试与withLatestFrom操作员一起做,但它只适用于第一次;服务器继续发送数据,服务工作但我没有响应效果。如果我删除withLatestFrom操作员,我会解决所有问题,它可以正常工作,但我需要订阅其他可观察对象才能在函数内部进行一些检查。我错了什么?我不能在效果中使用其他 RXJS 运算符?

  loadEVents3$ = createEffect(() =>
    this.actions$.pipe(
      ofType('[Websocketevent] Load Websocketevents'),
      mergeMap(() =>
        this.webSocketEventsService.getAllNotify(`${environment.SERVER_API_URL}/services/my-api-events/api/event`).pipe(
          map(res => {
            this.eventDto = res;
          }),
          withLatestFrom(
            this.idAccountSelector$,
            this.idCompanySelector$,
          ),
          map(res => {
            let idAccount = res[0];
            let idCompany = res[1];

            if (this.eventDto.eventType === EventType.FOLDER_NOTE) {
              if (this.eventDto.operationType === OperationType.CREATE) {
                return { type: noteEventsActions.ActionNotesType.LOAD_WEB_SOCKET_EVENTS_NOTES, payload: this.eventDto };
              } else if (this.eventDto.operationType === OperationType.UPDATE) {
                return { type: noteEventsActions.ActionNotesType.UPDATE_WEB_SOCKET_EVENTS_NOTES, payload: this.eventDto };
              } else if (this.eventDto.operationType === OperationType.DELETE) {
                return { type: noteEventsActions.ActionNotesType.DELETE_WEB_SOCKET_EVENTS_NOTES, payload: this.eventDto };
              }
            }
  
          }),
          catchError(() => EMPTY)
        )
      )
    )
  );

标签: angulartypescriptrxjsngrx

解决方案


我正在尝试使用 withLatestFrom 运算符来执行此操作,但它仅适用于第一次。

我很惊讶你第一次写的作品。我认为不应该。

一个小问题:

map(res => {
  this.eventDto = res;
}),

就 RxJS 而言,

map(_ => {})

没有明确定义的行为。它会将每个响应转换(映射)undefined为. 这可能不是你想要的。也许tap运营商会更好。在那里,您可以在不修改流的情况下创建副作用。

下面的地图和水龙头大致相当:

map(res => {
  this.eventDto = res;
  return res;
})

tap(res => {
  this.eventDto = res;
})

您的第二个地图运算符也不总是有返回结果。你会在那里看到同样的问题。


一个大问题:

withLatestFrom增加源流。

考虑一下这个和你的有点相似的设置

interval(7000).pipe(

  withLatestFrom(
    interval(5000),
    interval(3000)
  ),
  map(vs => vs.map(v => v + 1))

).subscribe(([i7s, i5s, i3s]) => {
  console.log(`Source Interval has emitted ${i7s} times`);
  console.log(`withLatestFrom 5 second Interval has emitted ${i5s} times`);
  console.log(`withLatestFrom 3 second Interval has emitted ${i3s} times`);
});

注意 map 和 subscribe 是如何得到一个 thruple 的(一个 3 值数组)?

您的代码将错误的变量分配给结果的错误部分。let idAccount = res[0];应始终未定义。因此,这应该会引发错误,然后您将忽略该错误,整个过程将停止。

你看到的大概是哪个?很难说。

寻求解决方案:

您没有给我们一个最小的工作示例,因此我提出的任何解决方案(使用您的代码)都是我无法真正测试的解决方案。即使是这样!也许尝试这样的事情:

loadEVents3$ = createEffect(() =>
  this.actions$.pipe(
    ofType('[Websocketevent] Load Websocketevents'),
    mergeMap(() => this.webSocketEventsService.getAllNotify(
        `${environment.SERVER_API_URL}/services/my-api-events/api/event`
      ).pipe(
        tap(res => this.eventDto = res),
        filter(eventDto => eventDto.eventType === EventType.FOLDER_NOTE),
        withLatestFrom(
          this.idAccountSelector$,
          this.idCompanySelector$,
        ),
        map(([eventDto, idAccount, idCompany]) => {

          // Because of the filter above, we KNOW that
          // eventDto.eventType === EventType.FOLDER_NOTE is true here.

          let type = null;

          if (eventDto.operationType === OperationType.CREATE) {
            type = noteEventsActions.ActionNotesType.LOAD_WEB_SOCKET_EVENTS_NOTES;
          } else if (eventDto.operationType === OperationType.UPDATE) {
            type = noteEventsActions.ActionNotesType.UPDATE_WEB_SOCKET_EVENTS_NOTES;
          } else if (this.eventDto.operationType === OperationType.DELETE) { 
            type = noteEventsActions.ActionNotesType.DELETE_WEB_SOCKET_EVENTS_NOTES;
          }

          // Looks like you never use idAccount or idCompany???
          
          return {
            type,
            payload: eventDto 
          };

        }),
        catchError(() => EMPTY)
      )
    )
  )
);

推荐阅读