首页 > 解决方案 > 观察者模式设计和 RXJS 库

问题描述

我是库 RXJS 的新手用户,并试图弄清楚如何正确使用 Observable 和 Subjects。我正在尝试与模式设计观察者进行比较。在某些时候,我有一个问题,如果库 RXJS 中的 Observable 实例是观察者模式设计的一个特例?

标签: design-patternsrxjsobservableobserver-patternrxjs-observables

解决方案


根据定义, AnObservable是随着时间的推移发出数据的实体。这听起来有点模糊,同时也很有趣。

在我看来,所有 RxJS 的魔力都是通过链表实现的。

每当您创建Observableusingnew Observable(subscriber => {})时,您都在定义源或链表的HEAD节点。另外,你有没有想过为什么调用参数subscriberor observer?我也会尝试分享我对此的看法。

链表是在以下帮助下创建的Observable.pipe()

pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
  return operations.length ? pipeFromArray(operations)(this) : this;
}

export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
  if (fns.length === 0) {
    return identity as UnaryFunction<any, any>;
  }

  if (fns.length === 1) {
    return fns[0];
  }

  return function piped(input: T): R {
    return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
  };
}

Observable.lift()

protected lift<R>(operator?: Operator<T, R>): Observable<R> {
  const observable = new Observable<R>();
  observable.source = this;
  observable.operator = operator;
  return observable;
}

如你所知,在 RxJS 中有很多操作符。Anoperator是一个函数,它返回另一个函数,其参数是Observable(类型T)并且其返回值也是Observable(类型R)。

例如map()

export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
  return function mapOperation(source: Observable<T>): Observable<R> {
    if (typeof project !== 'function') {
      throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
    }
    return lift(source, new MapOperator(project, thisArg));
  };
}

所以,当你有

const src$ = new Observable(s => /* ... */)
  .pipe(
    map(/* ... */)
  )

会发生一些事情:

  • 首先,它将创建Observable实例;提供的回调(在这种情况下s => ...)将存储在_subscribe属性中
  • pipe()被调用;它会返回fns[0],在这种情况下是mapOperation函数
  • mapOperation将使用Observable实例作为其参数(从pipeFromArray(operations)(this))调用;调用时,它将调用source.lift(new MapOperator(project, thisArg));Observable.lift()是将节点添加到此链表的原因;如您所见,一个节点(除了HEAD)包含代表它的sourceoperator

当您订阅 时src$,将根据此列表创建另一个。在这个中,每个节点都是一个Subscriber. 这个列表的创建是基于每个operator 必须有一个call方法的事实

export interface Operator<T, R> {
  call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}

MapOperator也不例外

export class MapOperator<T, R> implements Operator<T, R> {
  constructor(private project: (value: T, index: number) => R, private thisArg: any) {
  }

  call(subscriber: Subscriber<R>, source: any): any {
    return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
  }
}

Subscriber节点之间的关系建立在Observable.subscribe()

在这种情况下,(上例)中的s参数new Observable(s => ...)将是MapSubscriber.

似乎我偏离了这个问题,但是通过上述解释,我想证明这里没有太多的Observer模式。

这种模式可以通过一个Subject扩展Observable实现:

export class Subject<T> extends Observable<T> implements SubscriptionLike { }

这意味着您可以使用Subject.pipe(...)and Subject.subscribe(subscriber)Subject为了实现这种模式,需要有一个自定义 _subscribe方法:

_subscribe(subscriber: Subscriber<T>): Subscription {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  } else if (this.hasError) {
    subscriber.error(this.thrownError);
    return Subscription.EMPTY;
  } else if (this.isStopped) {
    subscriber.complete();
    return Subscription.EMPTY;
  } else {
    
    // !!!
    this.observers.push(subscriber);
    return new SubjectSubscription(this, subscriber);
  }
}

如您所见,Subject该类跟踪其观察者(订阅者),因此当它发出一个值时,它的Subject.next()所有观察者都会收到它:

next(value: T) {
  if (this.closed) {
    throw new ObjectUnsubscribedError();
  }
  if (!this.isStopped) {
    const { observers } = this;
    const len = observers.length;
    const copy = observers.slice();
    for (let i = 0; i < len; i++) {
      copy[i].next(value!);
    }
  }
  }

a 作为侧节点,aSubject也可以充当 a Subscriber,所以你不必一直手动调用Subject.{next, error, complete}()。你可以用这样的东西来实现

src$.pipe(subjectInstance);

推荐阅读