首页 > 解决方案 > 合并多个 observables 保留旧值

问题描述

嘿,我正在从 Graphql api 请求数据。该服务以每批 10 件的大小批量交付。我的数据是基于日期的,所以目前我想下载一个月的所有数据,如果单击某个按钮,则加载下一个。我的方法在一个月内运行良好,我在响应中观察下一个标记,但如果我尝试将我的新日期与旧日期合并(合并 obserable),我只会收到新数据。

目前,当我想接收新数据时,我还需要重新订阅。

我的服务代码

import { Injectable } from '@angular/core';
import { TrainingSession, TrainingSessionConnection } from '../shared/interfaces/trainingsession';
import { Observable, BehaviorSubject, of, from, EMPTY, Subject, merge, zip, empty, combineLatest } from 'rxjs';
import { expand, scan, map, shareReplay, mergeMap, combineAll } from 'rxjs/operators';
import { AmplifyService } from 'aws-amplify-angular';
import { APIService, SessionByMyGoalQuery } from '../API.service';
import { startOfMonth, endOfMonth, format } from 'date-fns';

@Injectable({
    providedIn: 'root',
})
export class TrainingFacadeService {
    private sessions$: Observable<TrainingSessionConnection[]> = EMPTY;

    // TODO: Caching implementieren

    constructor(private amplifyService: AmplifyService, private apiService: APIService) {}

    private loadPage(index: string, userID: string, startDate): Observable<SessionByMyGoalQuery> {
        return from(
            this.apiService.SessionByMyGoal(
                userID,
                {
                    between: [
                        { date: format(startOfMonth(startDate), 'yyyy-MM-dd') },
                        { date: format(endOfMonth(startDate), 'yyyy-MM-dd') },
                    ],
                },
                null,
                null,
                index
            )
        );
    }
    /**
     * Groups an array of sessions by date this is useful for our stacked cards so a cardstack just gets an grouped array by date
     *
     */
    private groupByDate(sessionArray): TrainingSessionConnection[] {
        const sessionMap = new Map(sessionArray.map(({ date }) => [date, { date, items: [] }]));
        sessionArray.forEach(session => {
            (sessionMap.get(session.date) as TrainingSessionConnection).items.push(session);
        });
        const result: TrainingSessionConnection[] = (Array.from(
            sessionMap.values()
        ) as unknown) as TrainingSessionConnection[];
        return result;
    }

    loadAllSessions(startIdx = null, userID: string, date: Subject<Date>): Observable<TrainingSessionConnection[]> {
        // if (!this.sessions$) {
        //  this.sessions$ = this.requestSessions(startIdx, userID).pipe(shareReplay(1));
        // }
        date.subscribe(currentDate => {
            console.log('called');
            // this.sessions$ = this.requestSessions(startIdx, userID, currentDate).pipe(shareReplay(1));

            this.sessions$ = merge(this.sessions$, this.requestSessions(startIdx, userID, currentDate));
        });

        return this.sessions$;
    }

    private requestSessions(startIdx = null, userID: string, date) {
        return this.loadPage(startIdx, userID, date).pipe(
            // take each session and if there is a nexttoken trigger the next request this is because the api returns 10 items max
            expand(sessions => {
                return sessions.nextToken ? this.loadPage(sessions.nextToken, userID, date) : EMPTY;
            }),
            // map each response to our ConnectionModel because the items arent grouped
            map(sessions => {
                return this.groupByDate(sessions.items);
            }),
            scan((all, current) => {
                return [...all, ...current];
            }),
            shareReplay(1)
        );
    }
}

我的组件代码

import { Component, OnInit, ViewChild } from '@angular/core';
import { TrainDataService } from './traindata.service';
import { MatDatepicker } from '@angular/material';
import { APIService, SessionByMyGoalQuery } from '../API.service';
import { AmplifyService } from 'aws-amplify-angular';
import { TrainingSessionConnection, TrainingSession } from '../shared/interfaces/trainingsession';
import { TrainingFacadeService } from './training-facade.service';
import { scan, mergeMap } from 'rxjs/operators';
import { Observable, Subject, BehaviorSubject, EMPTY } from 'rxjs';
import { addDays, subDays, setDate } from 'date-fns';

@Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.scss'],
})
export class HomeComponent implements OnInit {
    constructor(
        public trainDataService: TrainDataService,
        private apiService: APIService,
        private amplifyService: AmplifyService,
        private trainingFacade: TrainingFacadeService
    ) {}

    sessions: Observable<TrainingSessionConnection[]> = EMPTY;
    date: BehaviorSubject<Date> = new BehaviorSubject(new Date('2019-01-05'));

    ngOnInit() {
        this.getBatch(null);
    }

    getBatch(offset: string) {
        this.amplifyService
            .auth()
            .currentAuthenticatedUser()
            .then(user => {
                this.sessions = this.trainingFacade.loadAllSessions(null, user.attributes.sub, this.date).pipe(
                    scan((all, current) => {
                        console.log(all);
                        console.log(current);
                        return [...all, ...current];
                    })
                );
            });
    }

    fetchNextMonth() {
        this.date.next(addDays(this.date.value, 31));
        this.amplifyService
            .auth()
            .currentAuthenticatedUser()
            .then(user => {
                this.sessions = this.trainingFacade.loadAllSessions(null, user.attributes.sub, this.date);
            });
    }

    fetchPreviousMonth() {
        this.date.next(subDays(this.date.value, 31));
        this.amplifyService
            .auth()
            .currentAuthenticatedUser()
            .then(user => {
                this.sessions = this.trainingFacade.loadAllSessions(null, user.attributes.sub, this.date);
            });
    }
}

我想要的行为包括该组件可以通过修改可观察的日期来触发新请求。因为它一旦订阅了数据,它就会获取所有新数据并保留旧数据。

标签: angularrxjs

解决方案


推荐阅读