design-patterns - 观察者模式设计和 RXJS 库
问题描述
我是库 RXJS 的新手用户,并试图弄清楚如何正确使用 Observable 和 Subjects。我正在尝试与模式设计观察者进行比较。在某些时候,我有一个问题,如果库 RXJS 中的 Observable 实例是观察者模式设计的一个特例?
解决方案
根据定义, AnObservable
是随着时间的推移发出数据的实体。这听起来有点模糊,同时也很有趣。
在我看来,所有 RxJS 的魔力都是通过链表实现的。
每当您创建Observable
usingnew Observable(subscriber => {})
时,您都在定义源或链表的HEAD节点。另外,你有没有想过为什么调用参数subscriber
or observer
?我也会尝试分享我对此的看法。
主链表是在以下帮助下创建的Observable.pipe()
:
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
如你所知,在 RxJS 中有很多操作符。Anoperator
是一个函数,它返回另一个函数,其参数是Observable
(类型T
)并且其返回值也是Observable
(类型R
)。
例如map()
:
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable<T>): Observable<R> {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return lift(source, new MapOperator(project, thisArg));
};
}
所以,当你有
const src$ = new Observable(s => /* ... */)
.pipe(
map(/* ... */)
)
会发生一些事情:
- 首先,它将创建
Observable
实例;提供的回调(在这种情况下s => ...
)将存储在_subscribe
属性中 pipe()
被调用;它会返回fns[0]
,在这种情况下是mapOperation
函数mapOperation
将使用Observable
实例作为其参数(从pipeFromArray(operations)(this)
)调用;调用时,它将调用source.lift(new MapOperator(project, thisArg));
;Observable.lift()
是将节点添加到此链表的原因;如您所见,一个节点(除了HEAD
)包含代表它的source
和operator
当您订阅 时src$
,将根据此列表创建另一个。在这个中,每个节点都是一个Subscriber
. 这个列表的创建是基于每个operator
必须有一个call
方法的事实
export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}
Subscriber
节点之间的关系建立在Observable.subscribe()
在这种情况下,(上例)中的s
参数new Observable(s => ...)
将是MapSubscriber
.
似乎我偏离了这个问题,但是通过上述解释,我想证明这里没有太多的Observer
模式。
这种模式可以通过一个Subject
扩展来Observable
实现:
export class Subject<T> extends Observable<T> implements SubscriptionLike { }
这意味着您可以使用Subject.pipe(...)
and Subject.subscribe(subscriber)
。Subject
为了实现这种模式,需要有一个自定义 _subscribe
方法:
_subscribe(subscriber: Subscriber<T>): Subscription {
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.isStopped) {
subscriber.complete();
return Subscription.EMPTY;
} else {
// !!!
this.observers.push(subscriber);
return new SubjectSubscription(this, subscriber);
}
}
如您所见,Subject
该类跟踪其观察者(订阅者),因此当它发出一个值时,它的Subject.next()
所有观察者都会收到它:
next(value: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value!);
}
}
}
a 作为侧节点,aSubject
也可以充当 a Subscriber
,所以你不必一直手动调用Subject.{next, error, complete}()
。你可以用这样的东西来实现
src$.pipe(subjectInstance);
推荐阅读
- java - 播放和停止按钮不能一起使用
- javascript - 这个 JavaScript 注入(基于 DOM)是真的还是假阳性?
- asp.net-core - 使用 ASP.NET Core 应用程序将各种文件保存在数据库中
- python - django w/uwsgi 致命 Python 错误:initfsencoding:无法获取语言环境编码
- python - Boost:致命错误 LNK1104:无法打开文件(lib)
- sql-server - 有没有办法从 Entity Framework Core 的层次结构表中获取 First() ?
- prolog - 有没有办法在 Prolog 中扩展爱因斯坦的谜语?
- javascript - JS 如何在 Firefox 中分别检查对网络摄像头和麦克风的访问权限
- django - 访问 ModelForm 中的其他字段
- java - 对象的Arraylist找到所有不同的属性对象