javascript - 如何使用 RxJS 来协调串行和并行作业?
问题描述
使用 RxJS,我正在努力寻找一种方法来处理串行和并行作业的混合。甚至改变/更改作业队列的处理并发性。
下面是一个使用并发硬编码处理数组中项目的基本工作示例:
import { from, defer } from 'rxjs'
import { mergeAll } from 'rxjs/operators'
const delay = 1000
const concurrency = 2
const doSomethingSlow = async (val) => {
await new Promise(resolve => {
setTimeout(() => {
console.log(`${val} - Done`)
resolve()
}, delay)
})
}
const jobs = [
1,2,3,4,5,6,7,8,9,10
]
const observables = from(jobs.map(num => defer(() => doSomethingSlow(num))))
observables
.pipe(
mergeAll(concurrency)
)
.subscribe()
https://stackblitz.com/edit/rxjs-tyfer3?file=index.ts
我想做的是将父级数组作为包含Parallel Jobs子数组的Serial Tasks列表。例如。
const tasks = [
{ concurrency: 1, jobs: [1] },
{ concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
{ concurrency: 1, jobs: [12, 13] }
]
希望1
是首先处理作业,并且在它完成之前没有其他任何事情开始。然后,作业2
和3
被并行拾取,并且只要插槽从两个可用的并发处理器中打开,就会处理4
通过的作业。11
只有当他们都完成后,才会12
接手工作13
。
我真的无法想象解决这个问题的正确方法。我试图merge
将obserables
其中有自己pipe(mergeAll(num))
的东西分开,但这似乎不起作用。我有一种感觉,这switchMap
可能是这里的关键,但我不确定在这种情况下如何准确地利用它。
我的另一种想法是尝试动态更改并发值,但这似乎更难做到,并且可能不会导致我正在寻找的行为。
任何帮助将不胜感激。
解决方案
mergeMap
有一个可选concurrent
参数,告诉它一次应该订阅多少个内部可观察对象。其实concatMap
就是一个mergeMap
with1
并发。所以你可以利用它来发挥你的优势:
const tasks = [
{ concurrency: 1, jobs: [1] },
{ concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
{ concurrency: 1, jobs: [12, 13] }
];
from(tasks)
.pipe(
concatMap(task => from(task.jobs).pipe( // single task such as `{ concurrency: 1, jobs: [1] }`
mergeMap(job => doSyncJob(job), task.concurrency) // `task.concurrency` is the important part here
// maybe `toArray()` to collect all results for this task into a single array
)),
)
.subscribe(...);
推荐阅读
- python - 获取类型错误:int 类型的参数在 Python 中不可迭代
- android - 我的应用程序中的 TextInput 使我的键盘永久滞后
- python - SVM+HOG目标检测器
- c++ - C++:构建多功能计算器
- html - 包装时如何删除导航栏的最后一项?
- python - TypeError:预期的 dtype 对象,得到 'numpy.dtype[int64]'
- flask - 使用 Flask Rest API 和 React 前端时出现 cors 问题
- asp.net - API 中的会话管理
- css - BigCommerce:如何整理 theme.scss.json 文件?
- python-3.x - 在 python 中使用 pygit2 模块时出错