angular - 如何使用 RxJs 定期抓取 REST API?
问题描述
我正在使用 RxJs 的 IntervalObservable 每秒轮询 REST API 以获取新的传感器数据。REST API 以在 10 秒内进行的传感器测量的“桶”进行响应,因此来自 API 的响应还可能包含一个 HTTP 标头“下一个”,该标头指向更新的传感器数据桶(如果可用)。
我当前的实现(见下文)有两个问题:
- 传感器数据获取从过去的时间戳开始,这可能意味着获取大量数据桶。如果用户离开使用该服务的 Angular 组件,它会破坏该服务,但初始数据获取仍在继续。我觉得我应该跟踪 getSensorObservations() 在 updateVisualization() 方法中返回的 Observable,并在销毁服务时取消订阅,但这意味着,每当我开始使用 IntervalObservable 调用 updateVisualization() 时,我都会有“一个(间隔)可观察的可观察的”,我需要杀死两者。我认为这不是最好的方法。
- 初始 updateVisualization() 可能需要一些时间来获取所有必要的历史数据,但是无论如何都会使用 IntervalObservable 启动下一个 updateVisualization() 调用,而无需等待初始调用。
你对这些混合的可观察物有什么建议吗?
export class WidgetService {
private widget: Widget;
private visualizer: any;
private updateScheduler: Subscription;
private timestampOfMostRecentObservation?: number;
constructor(private sensorGateway: SensorGatewayCommunicationService) { }
public initializeVisualization() {
this.visualizer = new TimeSeriesLineChartWithTimeRangeSelector();
this.visualizer.draw(`widget-${this.widget.id}-visualization`, this.widget.name, this.widget.seriesName);
// First update of the visualization with sensor data since a timestamp in the past (121 seconds ago here):
const initialFromTimestamp = Date.now() - 121 * 1000;
this.updateVisualization(initialFromTimestamp);
// Next updates of the visualization are scheduled every second:
this.updateScheduler = IntervalObservable.create(1000)
.subscribe(() => this.updateVisualization(this.timestampOfMostRecentObservation));
}
public destroy() {
this.updateScheduler.unsubscribe();
this.visualizer.destroy();
}
private updateVisualization(fromTimestamp: number) {
const urlForNewObservations = this.widget.sensorMeasurementsUrl + `?from=${fromTimestamp.toString()}`;
this.getSensorObservations(urlForNewObservations)
.pipe(
expand(({sensorObservations, nextUrl}) => { // https://ncjamieson.com/understanding-expand/
if (sensorObservations && sensorObservations.length > 0 && nextUrl) {
return this.getSensorObservations(nextUrl);
} else {
return empty();
}
}),
concatMap(({sensorObservations}) => sensorObservations),
)
.subscribe((sensorObservations: [number, number][]) => {
const downsampledObservations = this.downsampleSensorObservations(sensorObservations);
this.visualizer.update(downsampledObservations);
});
}
private getSensorObservations(urlForNewObservations: string): Observable<{
sensorObservations: object[],
nextUrl: string | null
}> {
return this.sensorGateway.getApiResource(urlForNewObservations).pipe(
map(response => {
if ('values' in response.body) {
return {
sensorObservations: response.body['values'].map(observation => [
observation[0],
observation[1]
]),
nextUrl: this.getNextLinkUrl(response)
};
} else {
return null;
}
})
);
}
private getNextLinkUrl(response: HttpResponse<object>): string | null {
if (response.headers.has('link')) {
const linkHeader = response.headers.get('link');
/* Example of a linkHeader:
*'</sensors/1/properties/1/observations/20180711/12/19?from=1531311594456>; rel="self",
* </sensors/1/properties/1/observations/20180711/12/18>; rel="prev",
* </sensors/1/properties/1/observations/20180711/12/20>; rel="next"' */
const links = linkHeader.split(',');
const nextLink = links.find(link => link.endsWith('; rel="next"'));
if (nextLink) {
return nextLink.substring(nextLink.indexOf('<') + 1, nextLink.indexOf('>'));
} else {
return null;
}
}
}
}
解决方案
我不会使用订阅一个 observable 来触发另一个,而是将问题抛在脑后,并创建一个单独的 observable 来满足您的需求。
我会在你的初始化方法中提出这样的建议:
let fromTimestamp = Date.now() - 121 * 1000;
// Create a base observable, doesn't really matter what it is
this.subscription = of(true).pipe(
// Map to the right call fur the current time
flatMap(() => {
const urlForNewObservations = this.widget.sensorMeasurementsUrl + `?from=${fromTimestamp.toString()}`;
return this.getSensorObservations(urlForNewObservations);
}),
// Repeat the REST call while the sensor returns a next URL:
expand(({sensorObservations, nextUrl}) => { // https://ncjamieson.com/understanding-expand/
if (sensorObservations && sensorObservations.length > 0 && nextUrl) {
// Set the fromTimestamp for the next set of observations.
fromTimestamp = this.parseTimestamp(nextUrl, fromTimestamp);
return this.getSensorObservations(nextUrl);
} else {
return empty();
}
}),
concatMap(({sensorObservations}) => sensorObservations),
// Keep repeating this
repeat(),
// But wait a second between each one
delay(1000),
// Terminate the whole thing when the service is destroyed / stopped.
takeWhile(() => !this.destroyed)
).subscribe((sensorObservations: [number, number][]) => {
const downsampledObservations = this.downsampleSensorObservations(sensorObservations);
this.visualizer.update(downsampledObservations);
});
您需要实现 parseTimestamp 以解析来自 URL 或类似内容的相关下一个时间戳。
然后实现 ngOnDestroy 将 this.destroyed 设置为 true 和 do if (this.subscription) this.subscription.unsubscribe();
,这将在服务被销毁时终止订阅 - 如果您还想手动控制它,请在您的 destroy 方法中手动将其设置为 true/unsubscribe 。
推荐阅读
- c++ - 如何从 lambda 内部设置不可复制的捕获变量
- ruby-on-rails - 如何在多租户 Rails 6 应用程序中使用水平分片?
- git - 如何正确获取远程分支?
- python - 是否可以在命令行中运行 fastapi?
- api - 如何从 Greenplum 数据更新 Neo4j 的描述
- docker - docker 用户名交互导致“我没有名字”错误?
- css - 为什么设置 max-width: 100% 的图像有效地将最大值设置为其原始宽度?
- git - Tomcat 和 Jetty 在不同的分支上
- python - 如何过滤一列df中不在另一个具有相同列名的df中的值?
- python - 如果文件已经存在,为什么我的机器人不替换文件?