rxjs - 使用 RxJS 的排队功能
问题描述
我在后端使用 rxjs 和 NodeJS。我有一个允许消费者运行远程纱线安装过程的 Rest API。install 函数返回一个可观察的进程。因此,当模块安装成功时,它会在 observable 和 complete 中发出一个值。此时,Rest API 会返回一个响应给用户说安装成功。如果安装失败,该过程将在流中抛出一个错误,并且 Rest API 返回另一个带有错误信息的响应。
我的问题是:
该 API 被消费者并行调用多次,因此后端会有并行安装。
我尝试油门操作员创建一个队列,但它保持第一个流处于活动状态。所以如果第一个进程“完成”,它返回“真”但流没有完成
export class MyService {
// the function called by the REST API
installGlobal(moduleName: string): Observable < boolean > {
// I think, there are something to do here to make it queuing
return this.run('yarn', ['global', 'add', moduleName]);
}
private run(cmd: string, args: string[]): Observable < boolean > {
const cmd$ = fromPromise(spawn(cmd, args)).pipe(
map(stdout => {
this.logger.info(`Install Module Successfully`);
this.logger.info(`stdout: ${stdout.toString()}`);
return true;
}),
catchError(error => {
const errorMessage: string = error.stderr.toString();
return _throw(errorMessage.substr(errorMessage.indexOf(' ') + 1));
})
);
return cmd$;
}
}
我的期望:
要么有多个请求,要么必须排队。所以第一个将被处理,所有并行的一次都必须排队。当第一个被处理时,它必须将响应返回给 API 消费者(如 200 完成)并从队列中恢复下一个流。
[UPDATE-01 July 2019]:添加示例
您可以在stackblitz获取代码演示
我已经重新实现了现有代码,并且通过多次订阅将调用队列的服务来模拟我的 API 调用
解决方案
Rxjs 中的一个简单队列可以像下面这样完成
const queque=new Subject()
// sequential processing
queue.pipe(concatMap(item=>yourObservableFunction(item)).subscribe()
// add stuff to the queue
queque.next(item)
推荐阅读
- javascript - window.location.href 不重定向
- java - 显示来自 4 个具有不同 ID 的源的表面视图的 4 个相机预览,并从每个源捕获图像
- ios - 如何访问变量以观察 SwiftUI UI 测试的变化?
- php - 如何将特定类别的产品页面重定向到 Woocommerce 中的购物车页面
- java - 在两个属性上比较两个 POJO 类
- sql - 函数参数,Postgres
- pandas - 有什么问题?:“read_excel() 有一个意外的关键字参数 'fillna'”
- amazon-dynamodb - 如何为用户 id 等于 DyanmoDB 表名的 DynamoDB 创建策略?
- google-bigquery - 如何删除 GCP BigQuery 中文件名中有空格的表
- php - Laravel yajra 表格分页服务器端,下一页为空