首页 > 解决方案 > 使用 RxJS 的排队功能

问题描述

我在后端使用 rxjs 和 NodeJS。我有一个允许消费者运行远程纱线安装过程的 Rest API。install 函数返回一个可观察的进程。因此,当模块安装成功时,它会在 observable 和 complete 中发出一个值。此时,Rest API 会返回一个响应给用户说安装成功。如果安装失败,该过程将在流中抛出一个错误,并且 Rest API 返回另一个带有错误信息的响应。

我的问题是:

该 API 被消费者并行调用多次,因此后端会有并行安装。

我尝试油门操作员创建一个队列,但它保持第一个流处于活动状态。所以如果第一个进程“完成”,它返回“真”但流没有完成

export class MyService {
    // the function called by the REST API
    installGlobal(moduleName: string): Observable < boolean > {
        // I think, there are something to do here to make it queuing
        return this.run('yarn', ['global', 'add', moduleName]);
    }

    private run(cmd: string, args: string[]): Observable < boolean > {
        const cmd$ = fromPromise(spawn(cmd, args)).pipe(
            map(stdout => {
                this.logger.info(`Install Module Successfully`);
                this.logger.info(`stdout: ${stdout.toString()}`);
                return true;
            }),
            catchError(error => {
                const errorMessage: string = error.stderr.toString();
                return _throw(errorMessage.substr(errorMessage.indexOf(' ') + 1));
            })
        );
        return cmd$;
    }
} 

我的期望:

要么有多个请求,要么必须排队。所以第一个将被处理,所有并行的一次都必须排队。当第一个被处理时,它必须将响应返回给 API 消费者(如 200 完成)并从队列中恢复下一个流。

[UPDATE-01 July 2019]:添加示例

您可以在stackblitz获取代码演示

我已经重新实现了现有代码,并且通过多次订阅将调用队列的服务来模拟我的 API 调用

标签: rxjsrxjs-pipeable-operators

解决方案


Rxjs 中的一个简单队列可以像下面这样完成

const queque=new Subject()
// sequential processing
queue.pipe(concatMap(item=>yourObservableFunction(item)).subscribe()
// add stuff to the queue 
queque.next(item)

推荐阅读