rxjs - 如何使用 ws ( Websockets ) 和 RxJs 在 NestJS 中连接一个长时间运行的进程
问题描述
我想在函数中运行代码 - 然后作为 websocket Observable 返回。有效监控长时间运行的进程。我不知道如何通过这种格式的 websocket 正确返回值。
我的长期运行过程:(显然实际上不会花费很长时间)
import { Observable } from 'rxjs';
export function longRunningProcess (): Observable<unknown> {
return new Observable(subscriber => {
subscriber.next('End of step 1');
subscriber.next('End of step 2');
subscriber.next('End of step 3');
setTimeout(() => {
subscriber.next('End of Step 4');
subscriber.complete();
}, 1000);
});
}
返回到 ws ( Websocket ) 的我的 NestJS 端点
import { WsAdapter } from '@nestjs/platform-ws';
import {
MessageBody,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'
@WebSocketGateway()
export class EventsGateway {
@WebSocketServer()
server: Server;
@SubscribeMessage('events')
// send {"event":"events","data":"test"} in websockets
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return from(longRunningProcess) // Not really sure how to return this
//return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item }))); //<< this works from the sample
}
@SubscribeMessage('identity')
async identity (@MessageBody() data: number): Promise<number> {
return data;
}
}
解决方案
只是map
你longRunningProcess
对 numbers 数组所做的结果。
@SubscribeMessage('events')
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
}
推荐阅读
- php - 使用 simplHtmlDom 找不到任何东西
- python - 在python中将整数转换为hh:mm:ss
- r - R xgboost 错误,输入数据包含“inf”或“nan”,但它之前已经工作过
- reactjs - 使用 mapStateToProps (react-redux) 与 store.getState() 时 this.props 中收到的值的差异
- python - Python复制列表并添加新元素
- python - 如何将 Python3 升级后的代码恢复到原始版本?
- vlang - 在 vlang 中将字符串转换为数组
- c# - 随时间滴答生成随机加密密钥并将其存储在客户端和数据库服务器 Windows 应用程序表单 C#
- javascript - 一张一张显示图片
- cypress - 赛普拉斯 - 每次 url 更改时写入文件