首页 > 解决方案 > Rxjs 订阅队列

问题描述

我的 Angular 应用程序中有一个 Firebase 订阅,它会触发多次。我如何实现将任务作为队列处理,以便我可以同步运行每个任务一次?

this.tasks.subscribe(async tasks => {
   for (const x of tasks) 
      await dolongtask(x); // has to be sync
      await removetask(x);
   });

问题是当 longtask 仍在处理时会触发 subribe 事件。

标签: javascriptfirebaserxjsreactive-programming

解决方案


恕我直言,我会尝试利用 rxjs 的强大功能,因为无论如何我们已经在这里使用它,并避免按照另一个答案的建议实施自定义排队概念(尽管您当然可以这样做)。

如果我们稍微简化一下给定的情况,我们只有一些 observable 并且想要为每个发射执行一个长时间运行的过程 -按顺序。rxjs 允许通过concatMap基本上开箱即用的操作符来做到这一点:

$data.pipe(concatMap(item => processItem(item))).subscribe();

这只假设processItem返回一个可观察的。由于您使用过await,我假设您的函数当前返回 Promises。这些可以使用from.

从 OP 剩下的唯一细节是 observable 实际上发出了一个项目数组,我们希望对每个发射的每个项目执行操作。为此,我们只需使用mergeMap.


让我们把它们放在一起。请注意,如果您不准备一些存根数据和日志记录,那么实际的实现只有行代码(使用mergeMap + concatMap)。

const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;

// Stub for the long-running operation
function processTask(task) {
  console.log("Processing task: ", task);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log("Finished task: ", task);
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}

// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));

// Some stubbed data stream
const tasks$ = interval(250).pipe(
  take(9),
  bufferCount(3),
);

tasks$.pipe(
  tap(task => console.log("Received task: ", task)),
  // Flatten the tasks array since we want to work in sequence anyway
  mergeMap(tasks => tasks),
  // Process each task, but do so consecutively
  concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>


推荐阅读