websocket - 接收连接和消息请求并发出消息和连接状态更新的 Websocket 史诗
问题描述
我希望创建一个可与我的应用程序的其余部分分开的 redux-observable 史诗。它需要:
- 侦听 的传入操作
{ type: "SOCKET_TRY_CONNECT" }
,这也应该在连接时忽略任何其他 SOCKET_TRY_CONNECT 事件。另外收听要发送的消息,也许{ type: "SOCKET_MESSAGE_SEND", data }
- 发出传出动作
{ type: "SOCKET_CONNECTED" }
,{ type: "SOCKET_DISCONNECTED", error }
和{ type: "SOCKET_MESSAGE_RECEIVE", data }
史诗需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新。它还需要能够发送和接收可以在其他地方处理的消息。
我最接近这个的是这个问题中提供的答案:
const somethingEpic = action$ =>
action$.ofType('START_SOCKET_OR_WHATEVER')
.switchMap(action =>
Observable.webSocket('ws://localhost:8081')
.map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
);
但是我不确定如何扩展它以另外发出连接建立和断开连接的事件,并另外接受要发送到服务器的消息。
解决方案
一般来说,听起来你想要这样的东西:
(注意,这是未经测试的代码,但应该非常接近可运行)
const somethingEpic = action$ =>
action$.ofType('START_SOCKET_OR_WHATEVER')
.switchMap(action => {
// Subjects are a combination of an Observer *and* an Observable
// so webSocket can call openObserver$.next(event) and
// anyone who is subscribing to openObserver$ will receive it
// because Subjects are "hot"
const openObserver$ = new Subject();
const openObserver$ = new Subject();
// Listen for our open/close events and transform them
// to redux actions. We could also include values from
// the events like event.reason, etc if we wanted
const open$ = openObserver$.map((event) => ({
type: 'SOCKET_CONNECTED'
}));
const close$ = openObserver$.map((event) => ({
type: 'SOCKET_DISCONNECTED'
}));
// webSocket has an overload signature that accepts this object
const options = {
url: 'ws://localhost:8081',
openObserver: openObserver$,
closeObserver: openObserver$
};
const msg$ = Observable.webSocket(options)
.map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
.catch(e => Observable.of({
type: 'SOCKET_ERROR',
payload: e.message
}))
// We're merging them all together because we want to listen for
// and emit actions from all three. For good measure I also included
// a generic .takeUntil() to demonstrate the most obvious way to stop
// the websocket (as well as the open/close, which we shouldn't forget!)
// Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
// or also a SOCKET_ERROR because we want to stop subscribing
// to open$/close$ if there is an error.
return Observable.merge(open$, close$, msg$)
.takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
});
如果这个史诗需要一次支持多个套接字,您将需要想出某种唯一标识特定连接的方法,并修改代码以基于此过滤信号。例如
.takeUntil(
action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
.filter(action => action.someHowHaveId === someHowHaveId)
);
推荐阅读
- asp.net - 如何将请求时间设置为几个小时 - IIS 和 ASP:NET
- kubernetes - PriorityClass 不会将其值填充到 podSpec
- php - 如何根据php mysql中的id更新一行的总和
- c# - 特定目的地的默认映射
- mongodb - PowerShell Invoke-Command 未正确传递参数
- c# - IDbCommandInterceptor 的工作负载影响
- nginx - 如何为 2 个不同的站点正确配置 NGINX?
- linux - 如何在linux中递归更改文件夹/文件标志?
- python - 在 django 应用程序中使用 shell 命令将输出文件保存在磁盘上
- javascript - Require 似乎导致找不到存在的模块的模块