首页 > 解决方案 > 如何使用 RxJS 来协调串行和并行作业?

问题描述

使用 RxJS,我正在努力寻找一种方法来处理串行和并行作业的混合。甚至改变/更改作业队列的处理并发性。

下面是一个使用并发硬编码处理数组中项目的基本工作示例:

import { from, defer } from 'rxjs'
import { mergeAll } from 'rxjs/operators'

const delay = 1000
const concurrency = 2

const doSomethingSlow = async (val) => {
  await new Promise(resolve => {
    setTimeout(() => {
      console.log(`${val} - Done`)
      resolve()
    }, delay)
  })
}

const jobs = [
  1,2,3,4,5,6,7,8,9,10
]
const observables = from(jobs.map(num => defer(() => doSomethingSlow(num))))

observables
  .pipe(
    mergeAll(concurrency)
  )
  .subscribe()

https://stackblitz.com/edit/rxjs-tyfer3?file=index.ts

我想做的是将父级数组作为包含Parallel Jobs子数组的Serial Tasks列表。例如。

const tasks = [
  { concurrency: 1, jobs: [1] },
  { concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
  { concurrency: 1, jobs: [12, 13] }
]

希望1是首先处理作业,并且在它完成之前没有其他任何事情开始。然后,作业23被并行拾取,并且只要插槽从两个可用的并发处理器中打开,就会处理4通过的作业。11只有当他们都完成后,才会12接手工作13

我真的无法想象解决这个问题的正确方法。我试图mergeobserables其中有自己pipe(mergeAll(num))的东西分开,但这似乎不起作用。我有一种感觉,这switchMap可能是这里的关键,但我不确定在这种情况下如何准确地利用它。

我的另一种想法是尝试动态更改并发值,但这似乎更难做到,并且可能不会导致我正在寻找的行为。

任何帮助将不胜感激。

标签: javascriptconcurrencyrxjsqueue

解决方案


mergeMap有一个可选concurrent参数,告诉它一次应该订阅多少个内部可观察对象。其实concatMap就是一个mergeMapwith1并发。所以你可以利用它来发挥你的优势:

const tasks = [
  { concurrency: 1, jobs: [1] },
  { concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
  { concurrency: 1, jobs: [12, 13] }
];

from(tasks)
  .pipe(
    concatMap(task => from(task.jobs).pipe( // single task such as `{ concurrency: 1, jobs: [1] }`
      mergeMap(job => doSyncJob(job), task.concurrency) // `task.concurrency` is the important part here
      // maybe `toArray()` to collect all results for this task into a single array
    )),
  )
  .subscribe(...);


推荐阅读