首页 > 解决方案 > 接收连接和消息请求并发出消息和连接状态更新的 Websocket 史诗

问题描述

我希望创建一个可与我的应用程序的其余部分分开的 redux-observable 史诗。它需要:

史诗需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新。它还需要能够发送和接收可以在其他地方处理的消息。

我最接近这个的是这个问题中提供的答案

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action =>
      Observable.webSocket('ws://localhost:8081')
        .map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
    );

但是我不确定如何扩展它以另外发出连接建立和断开连接的事件,并另外接受要发送到服务器的消息。

标签: websocketrxjsredux-observable

解决方案


一般来说,听起来你想要这样的东西:

(注意,这是未经测试的代码,但应该非常接近可运行)

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action => {
      // Subjects are a combination of an Observer *and* an Observable
      // so webSocket can call openObserver$.next(event) and
      // anyone who is subscribing to openObserver$ will receive it
      // because Subjects are "hot"
      const openObserver$ = new Subject();
      const openObserver$ = new Subject();

      // Listen for our open/close events and transform them
      // to redux actions. We could also include values from
      // the events like event.reason, etc if we wanted
      const open$ = openObserver$.map((event) => ({
        type: 'SOCKET_CONNECTED'
      }));
      const close$ = openObserver$.map((event) => ({
        type: 'SOCKET_DISCONNECTED'
      }));

      // webSocket has an overload signature that accepts this object
      const options = {
        url: 'ws://localhost:8081',
        openObserver: openObserver$,
        closeObserver: openObserver$
      };
      const msg$ = Observable.webSocket(options)
        .map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
        .catch(e => Observable.of({
          type: 'SOCKET_ERROR',
          payload: e.message
        }))

      // We're merging them all together because we want to listen for
      // and emit actions from all three. For good measure I also included
      // a generic .takeUntil() to demonstrate the most obvious way to stop
      // the websocket (as well as the open/close, which we shouldn't forget!)
      // Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
      // or also a SOCKET_ERROR because we want to stop subscribing
      // to open$/close$ if there is an error.  
      return Observable.merge(open$, close$, msg$)
        .takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
    });

如果这个史诗需要一次支持多个套接字,您将需要想出某种唯一标识特定连接的方法,并修改代码以基于此过滤信号。例如

.takeUntil(
  action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
    .filter(action => action.someHowHaveId === someHowHaveId)
);

推荐阅读