javascript - Rxjs 订阅队列
问题描述
我的 Angular 应用程序中有一个 Firebase 订阅,它会触发多次。我如何实现将任务作为队列处理,以便我可以同步运行每个任务一次?
this.tasks.subscribe(async tasks => {
for (const x of tasks)
await dolongtask(x); // has to be sync
await removetask(x);
});
问题是当 longtask 仍在处理时会触发 subribe 事件。
解决方案
恕我直言,我会尝试利用 rxjs 的强大功能,因为无论如何我们已经在这里使用它,并避免按照另一个答案的建议实施自定义排队概念(尽管您当然可以这样做)。
如果我们稍微简化一下给定的情况,我们只有一些 observable 并且想要为每个发射执行一个长时间运行的过程 -按顺序。rxjs 允许通过concatMap
基本上开箱即用的操作符来做到这一点:
$data.pipe(concatMap(item => processItem(item))).subscribe();
这只假设processItem
返回一个可观察的。由于您使用过await
,我假设您的函数当前返回 Promises。这些可以使用from
.
从 OP 剩下的唯一细节是 observable 实际上发出了一个项目数组,我们希望对每个发射的每个项目执行操作。为此,我们只需使用mergeMap
.
让我们把它们放在一起。请注意,如果您不准备一些存根数据和日志记录,那么实际的实现只有两行代码(使用mergeMap + concatMap)。
const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;
// Stub for the long-running operation
function processTask(task) {
console.log("Processing task: ", task);
return new Promise(resolve => {
setTimeout(() => {
console.log("Finished task: ", task);
resolve(task);
}, 500 * Math.random() + 300);
});
}
// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));
// Some stubbed data stream
const tasks$ = interval(250).pipe(
take(9),
bufferCount(3),
);
tasks$.pipe(
tap(task => console.log("Received task: ", task)),
// Flatten the tasks array since we want to work in sequence anyway
mergeMap(tasks => tasks),
// Process each task, but do so consecutively
concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
推荐阅读
- sql - 如何使用 CASE 条件过滤双重数据?
- java - 将字符串返回给 main 方法
- cookies - 检查第三方 Cookie 是否被禁用
- google-maps - 在颤动中单击时更改折线的颜色
- spring-boot - 我可以从 GraphQLResolver 安全地回调 GraphQLQueryResolver 吗?
- python - ValueError:在 Django 框架中没有足够的值来解包(预期 2,得到 1)
- pine-script - 从脚本中排除今天的数据
- reporting - 在 splunk 中将多个报告作为一封电子邮件发送
- python - 无法在 ubuntu 中杀死 conda
- python - matplotlib 在极坐标图中颤动