首页 > 解决方案 > 观察者是 RxJS 中 Observable 的“听众”吗?

问题描述

我正在学习 RxJS 并且对“监听器”在哪里(在 Observable 或 Observer 中)、它们如何订阅/取消订阅以及当观察器“不再对”Observable 感兴趣时会发生什么感到困惑,例如就像你使用takeor时一样takeUntil

对于第一部分——什么订阅了什么,什么是听众——我对这些陈述之间的矛盾感到困惑。从http://reactivex.io/rxjs/manual/overview.html我们读到 Observers 不是 Observables 的“监听器”

这与 addEventListener / removeEventListener 等事件处理程序 API 截然不同。使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为监听器。Observable 甚至不维护附加的观察者列表。

但在http://reactivex.io/learnrx/它说(练习 30)(突出显示我的)

基于 Event 的 Observable 永远不会自行完成。take() 函数创建一个新序列,该序列在离散数量的项目到达后完成。这很重要,因为与 Event 不同的是,当 Observable 序列完成时,它会取消订阅其所有侦听器。这意味着如果我们使用 take() 来完成我们的事件序列,我们不需要取消订阅!

这对我来说似乎很矛盾。例如,当您使用 设置 Observable 时,fromEvent事件侦听器在哪里?例如,当您take(1)在基于 DOM 事件的 Observable 上使用 时,第一个事件发送给观察者后会发生什么?Observer 是否取消了 Observable 的订阅,它继续发出事件,只是 Observer 不再监听它们了吗?或者 Observable 是否以某种方式取消订阅 Observer,即 eventListener 在 Observable 中,而不是 Observer?

感谢您提供任何线索 - 显然我不是只见树木不见森林,但我正在研究的教程,虽然他们擅长从概念上解释它,但让我对实际发生的事情感到困惑。

标签: javascriptrxjsobservable

解决方案


第一部分对单词的使用非常讲究,以强调订阅可观察对象是调用函数(或更可能是函数链)以运行它们包含的所有代码的问题。第二部分对其措辞不太讲究,但实际上并不是在谈论同一件事。如果你愿意,第二部分的措辞会更好,“当一个可观察对象完成时,它会在其观察者身上调用拆解逻辑。

当我说订阅一个可观察对象是调用一系列函数时,让我试着描述一下我的意思。考虑以下超级简单的示例:

对于一个超级简单的例子,假设我创建了这个 observable:

const justOne = Rx.Observable.create(function realSubscribe(observer) {
  observer.next(1);
  observer.complete();
});

justOne.subscribe(val => console.log(val));

如果我随后调用justOne.subscribe(val => console.log(val)),这样做将立即调用我命名为 realSubscribe 的函数。然后它会执行observer.next(1),这会导致注销 val,然后它会执行observer.complete()。就是这样。

在这个过程中,observable 没有创建或增加订阅者列表;它只是按顺序运行代码然后完成。


现在转到一个更实际的例子,让我们考虑fromEvent. 如果我要实现它,它可能看起来像这样(真正的实现更复杂,但这得到了它的要点):

function fromEvent(element, eventName) {
  return Rx.Observable.create(function subscribeToEvent(observer) {
    element.addEventListener(eventName, observer.next);
    return function cleanup() {
      element.removeEventListener(eventName, observer.next);
    }
  });
}

const observable = fromEvent(document, 'click');
const subscription = observable.subscribe(event => console.log(event));

现在,当我调用 observable.subscribe 时,它​​会运行 subscribeToEvent,并在这样做时调用文档上的 addEventListener。document.addEventListener确实会导致文档保留事件侦听器列表,但这是因为 addEventListener 的实现方式,而不是所有可观察对象共有的东西。可观察对象本身不会跟踪任何侦听器。它只是调用它被告知要调用的内容,然后返回一个清理函数。


接下来让我们看一下。和以前一样,真正的实现更加复杂,但大致是这样的:

// In the real `take`, you don't need to pass in another observable since that's
// available automatically from the context you called it in. But my sample code
// has to get it somehow.
function take(count, otherObservable) {
  return new Observable(function subscribeToTake(observer) {
    let soFar = 0;
    otherObservable.subscribe((value) => {
      observer.next(value);
      soFar++;
      if (soFar >= count) {
        observer.complete();
      }
    });
  });
}

const clickObservable = fromEvent(document, 'click');
take(1, clickObservable).subscribe(event => console.log(event))

正如评论中提到的,我使用的语法与它在 rxjs 中的使用方式不太匹配,但这是因为要模仿它需要更完整的实现。无论如何,要引起您注意的主要是我们开始生成一系列函数:

当我打电话.subscribe时,它会调用 subscribeToTake。这会设置一个计数器,然后调用 otherObservable.subscribe,即 subscribeToEvent。subscribeToEvent 然后调用 document.addEventListener。

Take 的工作是坐在这个功能链的中间。它跟踪到目前为止已经发出了多少值。如果计数足够低,它只会转发值。但是一旦达到计数,它将调用完成,从而结束可观察的。调用 complete 会导致 observable 运行它拥有的任何拆卸逻辑,或者它的链拥有的任何东西。没有拆解逻辑take,但fromEvent会运行一些拆解逻辑来删除事件侦听器。


推荐阅读