首页 > 解决方案 > 如何使用 ws ( Websockets ) 和 RxJs 在 NestJS 中连接一个长时间运行的进程

问题描述

我想在函数中运行代码 - 然后作为 websocket Observable 返回。有效监控长时间运行的进程。我不知道如何通过这种格式的 websocket 正确返回值。

我的长期运行过程:(显然实际上不会花费很长时间)

import { Observable } from 'rxjs';

export function longRunningProcess (): Observable<unknown> {

    return new Observable(subscriber => {
        subscriber.next('End of step 1');
        subscriber.next('End of step 2');
        subscriber.next('End of step 3');
        setTimeout(() => {
            subscriber.next('End of Step 4');
            subscriber.complete();
        }, 1000);
    });
}

返回到 ws ( Websocket ) 的我的 NestJS 端点

import { WsAdapter } from '@nestjs/platform-ws';
import {
    MessageBody,
    SubscribeMessage,
    WebSocketGateway,
    WebSocketServer,
    WsResponse,
} from '@nestjs/websockets';
import { from, Observable, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { Server } from 'ws';
import { longRunningProcess } from './test'

@WebSocketGateway()
export class EventsGateway {
    @WebSocketServer()
    server: Server;

    @SubscribeMessage('events')
    // send {"event":"events","data":"test"} in websockets
    findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
        return from(longRunningProcess) // Not really sure how to return this 
        //return from([1, 2, 3]).pipe(map(item => ({ event: 'events', data: item })));  //<< this works from the sample
    }

    @SubscribeMessage('identity')
    async identity (@MessageBody() data: number): Promise<number> {
        return data;
    }
}

标签: rxjsnestjsws

解决方案


只是maplongRunningProcess对 numbers 数组所做的结果。

@SubscribeMessage('events')
findAll (@MessageBody() data: any): Observable<WsResponse<unknown>> {
    return longRunningProcess().pipe(map(item => ({ event: 'events', data: item })));
}

推荐阅读