angular - 合并多个 observables 保留旧值
问题描述
嘿,我正在从 Graphql api 请求数据。该服务以每批 10 件的大小批量交付。我的数据是基于日期的,所以目前我想下载一个月的所有数据,如果单击某个按钮,则加载下一个。我的方法在一个月内运行良好,我在响应中观察下一个标记,但如果我尝试将我的新日期与旧日期合并(合并 obserable),我只会收到新数据。
目前,当我想接收新数据时,我还需要重新订阅。
我的服务代码
import { Injectable } from '@angular/core';
import { TrainingSession, TrainingSessionConnection } from '../shared/interfaces/trainingsession';
import { Observable, BehaviorSubject, of, from, EMPTY, Subject, merge, zip, empty, combineLatest } from 'rxjs';
import { expand, scan, map, shareReplay, mergeMap, combineAll } from 'rxjs/operators';
import { AmplifyService } from 'aws-amplify-angular';
import { APIService, SessionByMyGoalQuery } from '../API.service';
import { startOfMonth, endOfMonth, format } from 'date-fns';
@Injectable({
providedIn: 'root',
})
export class TrainingFacadeService {
private sessions$: Observable<TrainingSessionConnection[]> = EMPTY;
// TODO: Caching implementieren
constructor(private amplifyService: AmplifyService, private apiService: APIService) {}
private loadPage(index: string, userID: string, startDate): Observable<SessionByMyGoalQuery> {
return from(
this.apiService.SessionByMyGoal(
userID,
{
between: [
{ date: format(startOfMonth(startDate), 'yyyy-MM-dd') },
{ date: format(endOfMonth(startDate), 'yyyy-MM-dd') },
],
},
null,
null,
index
)
);
}
/**
* Groups an array of sessions by date this is useful for our stacked cards so a cardstack just gets an grouped array by date
*
*/
private groupByDate(sessionArray): TrainingSessionConnection[] {
const sessionMap = new Map(sessionArray.map(({ date }) => [date, { date, items: [] }]));
sessionArray.forEach(session => {
(sessionMap.get(session.date) as TrainingSessionConnection).items.push(session);
});
const result: TrainingSessionConnection[] = (Array.from(
sessionMap.values()
) as unknown) as TrainingSessionConnection[];
return result;
}
loadAllSessions(startIdx = null, userID: string, date: Subject<Date>): Observable<TrainingSessionConnection[]> {
// if (!this.sessions$) {
// this.sessions$ = this.requestSessions(startIdx, userID).pipe(shareReplay(1));
// }
date.subscribe(currentDate => {
console.log('called');
// this.sessions$ = this.requestSessions(startIdx, userID, currentDate).pipe(shareReplay(1));
this.sessions$ = merge(this.sessions$, this.requestSessions(startIdx, userID, currentDate));
});
return this.sessions$;
}
private requestSessions(startIdx = null, userID: string, date) {
return this.loadPage(startIdx, userID, date).pipe(
// take each session and if there is a nexttoken trigger the next request this is because the api returns 10 items max
expand(sessions => {
return sessions.nextToken ? this.loadPage(sessions.nextToken, userID, date) : EMPTY;
}),
// map each response to our ConnectionModel because the items arent grouped
map(sessions => {
return this.groupByDate(sessions.items);
}),
scan((all, current) => {
return [...all, ...current];
}),
shareReplay(1)
);
}
}
我的组件代码
import { Component, OnInit, ViewChild } from '@angular/core';
import { TrainDataService } from './traindata.service';
import { MatDatepicker } from '@angular/material';
import { APIService, SessionByMyGoalQuery } from '../API.service';
import { AmplifyService } from 'aws-amplify-angular';
import { TrainingSessionConnection, TrainingSession } from '../shared/interfaces/trainingsession';
import { TrainingFacadeService } from './training-facade.service';
import { scan, mergeMap } from 'rxjs/operators';
import { Observable, Subject, BehaviorSubject, EMPTY } from 'rxjs';
import { addDays, subDays, setDate } from 'date-fns';
@Component({
selector: 'app-home',
templateUrl: './home.component.html',
styleUrls: ['./home.component.scss'],
})
export class HomeComponent implements OnInit {
constructor(
public trainDataService: TrainDataService,
private apiService: APIService,
private amplifyService: AmplifyService,
private trainingFacade: TrainingFacadeService
) {}
sessions: Observable<TrainingSessionConnection[]> = EMPTY;
date: BehaviorSubject<Date> = new BehaviorSubject(new Date('2019-01-05'));
ngOnInit() {
this.getBatch(null);
}
getBatch(offset: string) {
this.amplifyService
.auth()
.currentAuthenticatedUser()
.then(user => {
this.sessions = this.trainingFacade.loadAllSessions(null, user.attributes.sub, this.date).pipe(
scan((all, current) => {
console.log(all);
console.log(current);
return [...all, ...current];
})
);
});
}
fetchNextMonth() {
this.date.next(addDays(this.date.value, 31));
this.amplifyService
.auth()
.currentAuthenticatedUser()
.then(user => {
this.sessions = this.trainingFacade.loadAllSessions(null, user.attributes.sub, this.date);
});
}
fetchPreviousMonth() {
this.date.next(subDays(this.date.value, 31));
this.amplifyService
.auth()
.currentAuthenticatedUser()
.then(user => {
this.sessions = this.trainingFacade.loadAllSessions(null, user.attributes.sub, this.date);
});
}
}
我想要的行为包括该组件可以通过修改可观察的日期来触发新请求。因为它一旦订阅了数据,它就会获取所有新数据并保留旧数据。
解决方案
推荐阅读
- sql - 前值分组
- wordpress - 我无法打开我的管理仪表板或查看我的 wordpress 网站 - “致命错误”
- asp.net-core - 当有许多关于性能的资源权限时,如何处理基于需求的资源授权?
- c# - ArgumentException:MongoDB + Unity2D 中的无效关键字 'mongodb+srv://test:test@HOST' - Live MongoDB 未连接
- c++ - 不能将私有成员直接从 main 传递给 C++ 中的函数
- r - 如何系统地从教科书中提取数据
- prometheus - 如何在 Grafana 中编辑或查看 Prometheus-Alerts?
- javascript - 如何在网站上从 grafana 仪表板显示一个面板(图表)?
- python - 如何在 Python 中删除第一个空格后的所有内容?
- python - 查看 Sharepoint 上托管的 Jupyter 笔记本