首页 > 解决方案 > 如何让 RxJS 可观察管道访问原始可观察对象的发射和管道的先前发射?

问题描述

我有一个 RxJS Observable,它对底层数据结构发出一系列更改——特别是来自 AngularFirestoreCollection 的 snapshotChanges()

相反,我想做的是使用Immer来维护一个不可变的结构,以便在结构上与“新”数组共享未更改的数据。

我无法解决的是如何pipe()关闭可观察对象,以便管道除了最新输出之外还snapshotChanges()可以访问先前发出的不可变数据(或首次默认值) 。snapshotChanges()

在代码中,我基本上已经拥有的是:

const docToObject = (doc) => { /* change document to fresh plain object every time */ };
const mappedData$ = snapshotChanges().pipe(
    map(changes => changes.map(change => docToObject(change.payload.doc)),
    tap(array => console.log('mutable array:', array)),
);

我本质上是在寻找这样的东西,我不知道XXX(...)应该是什么:

const newImmutableObject = (changes, old) => {
  // new immutable structure from old one + changes, structurally sharing as much as
  // possible
};
const mappedData$ = snapshotChanges().pipe(

// ==================================================================================
    XXX(...), // missing ingredient to combine snapshotChanges and previously emitted
              // value, or default to []
// ==================================================================================

    map(([snapshotChanges, prevImmutableOutput]) => newImmutableOutput(...)),
    tap(array => console.log('IMMUTABLE ARRAY with shared structure:', array)),
);

我觉得运算符接近expand我需要的,但它似乎只在后续运行中传递先前发出的值,而我也需要新发出的snapshotChanges.

给定一个 RxJS Observable 管道,我怎样才能操作这个 Observable 的排放物,同时还能访问管道之前的排放物?

标签: angularrxjsangularfire2rxjs-pipeable-operatorsimmer.js

解决方案


根据您的要求,我建议使用scan可以跟踪所有先前状态和新状态的操作员。

const newImmutableObject = (changes, old) => {
  // new immutable structure from old one + changes, structurally sharing as much as
  // possible
};
 const mappedData$ = snapshotChanges().pipe(
 scan((acc, current) => [...acc, current], []), //<-- scan is used here
 map(([snapshotChanges, prevImmutableOutput]) => newImmutableOutput(...)),
    tap(array => console.log('IMMUTABLE ARRAY with shared structure:', array)),
);

推荐阅读