首页 > 解决方案 > 从 WebSocketSubject 反序列化 Blob 对象

问题描述

我正在尝试使用WebSocketSubject. rxjs我已经将大部分基本代码设置为 Angular 服务,如下所示:

import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Subscription, from, Observable } from 'rxjs';

@Injectable({
  providedIn: 'root'
})
export class WebsocketsApiService {
  connection: WebSocketSubject<any>;

  constructor() { 
    this.connection = webSocket({
      url: <websocket URL>,
      closeObserver: {
        next: () => console.log('Sockets disconnected.')
      },
      openObserver: {
        next: () => console.log('Socket connected.')
      }
    });
  }

  connect(observer: (value: any) => void): Subscription {
    return this.connection.subscribe(observer);
  }

  send(data: any) {
    if (this.connection) {
      this.connection.next(data);
    } else {
      console.error('Websockets not connected');
    }
  }

  close() {
    if (this.connection) {
      this.connection.complete();
      this.connection = null;
    }
  }

  ngOnDestroy() {
    this.close();
  }
}

但是,使用此基本代码,它无法反序列化来自 websocket 连接的任何数据。调试后,我注意到它以 . 的形式接收数据Blob,这就是默认解串器功能JSON.parse(data)失败的原因。

我尝试通过将Blob数据获取为字符串来滚动一个简单的自定义反序列化器函数,但事实证明这Blob.text()是一个 Promise,这是我被卡住的部分:

constructor() { 
    this.connection = webSocket({
      /// ...
      deserializer: (e) => e.data.text()
    });
}

因此,使用此函数作为反序列化器,订阅此 Websocket 服务的每个组件都将获得一个字符串的 Promise,这会将解析 Promise 的责任推迟给组件。

问题:

  1. 这种当前的方法是否理想?让每个订阅组件自己解决承诺?
  2. 有没有更简洁的方法,最好是 websocket 订阅者立即解析 JSON 对象,而不是字符串订阅者必须单独解析?

标签: angularwebsocketrxjs

解决方案


mergeAll/mergeMap您可以使用, concatAll/concatMap,映射到内部 Promise 的值switchAll/switchMap。您可能希望concatAll/concatMap按照它们进入的顺序发出值。

constructor() { 
  this.connection = webSocket({
    /// ...
    deserializer: (e) => e.data.text()
  });
}

connect(observer: (value: any) => void): Subscription {
  return this.connection.pipe(
    concatAll()
  ).subscribe(observer);
}

或者

constructor() { 
  this.connection = webSocket({
    /// ...
    deserializer: ({data}) => data
  });
}

connect(observer: (value: any) => void): Subscription {
  return this.connection.pipe(
    concatMap(data => data.text())
  ).subscribe(observer);
}

推荐阅读