首页 > 解决方案 > 如何通过 RXJS 中的另一个 observable 修改 observable 流?角

问题描述

我想在我的ships$when 标志areShipsExpanded$将更改时更改嵌套标志。我怎样才能做到这一点。这是我到目前为止所拥有的:

areShipsExpanded$: Observable<boolean>;
ships$: Observable<Ship>;

constructor() {
   this.ships$ = this.shipsDataSource.getData().pipe(shareReplay());
}

ngOnInit(): void {
    this.areShipsExpanded$.pipe(
        map(flag => {
            if (flag) {
                this.ships$.pipe(map(s => s.items.forEach(r => r.isExpanded = flag)));
            }
        })
    ).subscribe();
}

不幸的是,ships$ 似乎没有被修改,或者我应该以某种方式刷新这个ships$observable,因为我以这种方式在我的视图中使用它:<ng-container *ngFor="let ship of (ships$ | async)?.items">


@编辑

我是这样做的,是正确的,还是不好的做法?

this.areShipsExpanded$.pipe(
    map(flag => {
        if (flag) {
            this.ships$ = this.ships$.pipe(map(s => {
                s.items.forEach(r => r.isExpanded = flag);
                return s;
            }));
        }
    })
).subscribe();

@EDIT2

我用过combineLatest,现在应该可以了吧?

shipsData$: Observable<Ship>;
ships$: Observable<Ship>;

constructor() {
   this.shipsData$ = this.shipsDataSource.getData().pipe(shareReplay());
}

this.ships$ = combineLatest(this.areShipsExpanded$, this.shipsData$).pipe
    map(([shipsExpanded, ships]) => {
        ships.items.forEach(r => r.isExpanded = shipsExpanded);
        return ships;
    }),
    share()
);

但问题是combineLatest,一开始函数将运行 2 次,因为两个 observables 都会发出值。是否有任何替代运算符的行为类似于combineLatest- 所以当任何 observable 发出一个值时,从每个 observable 发出最新值,但仅当所有 observable 都准备好时。避免一开始就重复。

标签: angularrxjsrxjs6

解决方案


希望这可以帮助您:

const fetchData$ = this.shipsDataSource.getData().pipe(shareReplay());

this.ships$ = merge(
    this.areShipsExpanded$.pipe(
        withLatestFrom(fetchData$),
        map(([shipsExpanded, ships]) => 
            // we could try to favor immutability
            ships.items.map(r => {...r, isExpanded:shipsExpanded})), 
    ), // ships after update
    fetchData$ // initial ships
).pipe(share());

推荐阅读