首页 > 解决方案 > rxjs 的错误和完整方法如何工作?

问题描述

最近我读到了 rxjs。此代码来自文档。

import { Observable } from 'rxjs';
 
const observable = new Observable(subscriber => {
  setTimeout(() => {
    subscriber.complete();
    subscriber.next(1);
  }, 1000);
});
 
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

正如它所说,如果传递了错误或完成通知,那么之后就无法传递任何其他内容。如何使用纯 js 在后台完成?有没有办法改变订阅功能并在完成/错误方法执行时动态返回?

标签: javascriptrxjs

解决方案


RxJS 的工作方式非常有趣。我在这个SO answer中分享了我对它的看法。我试图涵盖如何Observable.pipeObservable.subscribe以及如何Subject.observers在幕后工作。

但要点是这样的(再次,这是我花了一些时间阅读源代码后的观点):

RxJS 通过链表实现其功能。

基本上,有2个链表。一个是在 的帮助下创建的Observable.pipe(),其节点由运算符(例如filtertakemap)定义,另一个是在 上创建的Observable.subscribe()。这是subscribers清单。

一个运算符伴随着一个Subscriber实例。例如,map运算符具有MapSubscriber. 在 RxJS 7 之前,操作符的订阅者是这样调用的,现在它使用一种非常聪明的方式来注入定义操作符的逻辑。

如何使用纯 js 在幕后完成

如前所述,每个Subscriber都有next(), error(),complete()方法。例如,这是发生error()

error(err?: any): void {
  if (!this.isStopped) {
    this.isStopped = true;
    this._error(err);
  }
}

isStopped确保没有其他next通知可以通过:

next(value?: T): void {
  if (!this.isStopped) {
    this._next(value!);
  }
}

当然,这不是error事件发生时唯一发生的事情,RxJS 还负责拆卸逻辑,释放资源。


推荐阅读