首页 > 解决方案 > rx.js 6.3 中的 queueScheduler 是同步的 - 如果我使用 queueScheduler,为什么这个示例不会导致 SO?

问题描述

我有一个有趣的例子,不是现实生活中的任务,但无论如何:

const signal = new Subject();
let count = 0;

const somecalculations = (count) => console.log('do some calculations with ', count);


console.log('Start');
signal.pipe(take(1500)/*, observeOn(queueScheduler)*/)
  .subscribe(() => {
  somecalculations(count);
  signal.next(count++);
  console.log('check if reached ', count)
});

signal.next(count++);
console.log('Stop');

密码笔

Subject.next 以同步方式工作,所以如果我注释掉 observeOn(queueScheduler) - 它会导致堆栈溢出(我使用 take 运算符控制迭代次数,如果数字大于 1370 在我的计算机上 - 它会导致 SO)。

但是,如果我将 queueScheduler 放在那里-效果很好。QueueScheduler 是同步的,它以某种方式允许当前 onNext 处理程序运行完成运行,然后开始下一个计划运行。

有人可以用源代码详细信息向我解释一下吗?我试图挖掘它,但目前取得了部分成功。这是关于 observeOn 如何与 QueueScheduler 一起工作的,但答案让我无法理解。

observeOn src QueueScheduler.ts asyncScheduler

标签: rxjs

解决方案


感谢carant的支持。好像我理解了为什么队列调度程序在没有 SO 的情况下工作。

  1. 当第一次从observeOn 调用 signal.next _next queueScheduler.schedule->AsyncScheduler.schedule->Scheduler.schedule 导致 QueueAction.schedule 被调用

  2. QueueAction.flush被调用。this.scheduler.flush -> QueueSchedulerFlush-> AsyncScheduler.flush

  3. 第一次队列是空的,没有任务被执行,所以 this.active 是假的。此action.execute的 bc被调用。一切都以同步方式调用。

  4. action.execute 导致 onNext 函数再次运行。所以 onNext 调用 signal.next 它经过所有 1-3 点,但现在 this.active 为真(因为它实际上仍然是之前的 signal.next 运行),我们只是排队操作

  5. 所以第二个 signal.next 被处理,我们返回到第一个 signal.next 调用的 action.execute。它在执行时起作用,并一一转换动作。所以它完成了第一个 signal.next 动作的运行——但是现在我们在第二个 signal.next 递归调用中还有一个队列。所以我们为第二个 signal.next 运行 action.execute

  6. 而且情况正在重演。第一次刷新调用管理所有其他调用,例如:活动为真,我们将任务添加到队列中,然后重复上一次刷新调用并从队列中获取它。


推荐阅读