首页 > 解决方案 > Observable 与多个订阅者一起执行一次

问题描述

我有一段代码要定期执行,直到所有订阅者都取消订阅。

// This function shall be called *once* per tick,
// no matter the quantity of subscriber.
function doSomething(val) {
    console.log("doing something");
    return val;
}

observable = Rx.Observable.timer(0, 1000).map(val => doSomething(val));

const first = observable.subscribe(val => console.log("first:", val));
const second = observable.subscribe(val => console.log("second:", val));

// After 1.5 seconds, stop first.
Rx.Observable.timer(1500).subscribe(_ => first.unsubscribe());
// After 2.5 seconds, stop second.
Rx.Observable.timer(2500).subscribe(_ => second.unsubscribe());

JSFiddle

我的预期输出如下所示:

doing something
first: 0
second: 0
doing something
first: 1
second: 1
doing something
second: 2
<nothing more>

但是,doSomething当调用两个 observable 时,该函数会被调用两次。这是实际的输出:

doing something
first: 0
doing something
second: 0
doing something
first: 1
doing something
second: 1
doing something
second: 2
<nothing more>

我在做设计错误吗?有没有办法做到这一点?

标签: javascriptrxjsobservable

解决方案


您看到的行为是正确的。返回的 observableinterval是冷的。也就是说,在观察者订阅之前不会创建任何计时器,并且当订阅者订阅时,创建的计时器是专门针对该订阅的。

您期望的行为可以使用share操作符来实现:

observable = Rx.Observable
  .timer(0, 1000)
  .map(val => doSomething(val))
  .share();

运营商引用计数订阅并将可观察的share源多播给多个订阅者 - 因此将只有一个间隔/计时器,在两个订阅者之间共享。

有关更多信息,您可能会发现这篇文章很有用。


推荐阅读