首页 > 解决方案 > 如何使用 RxJs 定期抓取 REST API?

问题描述

我正在使用 RxJs 的 IntervalObservable 每秒轮询 REST API 以获取新的传感器数据。REST API 以在 10 秒内进行的传感器测量的“桶”进行响应,因此来自 API 的响应还可能包含一个 HTTP 标头“下一个”,该标头指向更新的传感器数据桶(如果可用)。

我当前的实现(见下文)有两个问题:

你对这些混合的可观察物有什么建议吗?

export class WidgetService {
  private widget: Widget;
  private visualizer: any;
  private updateScheduler: Subscription;
  private timestampOfMostRecentObservation?: number;

  constructor(private sensorGateway: SensorGatewayCommunicationService) { }

  public initializeVisualization() {
    this.visualizer = new TimeSeriesLineChartWithTimeRangeSelector();
    this.visualizer.draw(`widget-${this.widget.id}-visualization`, this.widget.name, this.widget.seriesName);
    // First update of the visualization with sensor data since a timestamp in the past (121 seconds ago here):
    const initialFromTimestamp = Date.now() - 121 * 1000;
    this.updateVisualization(initialFromTimestamp);
    // Next updates of the visualization are scheduled every second:
    this.updateScheduler = IntervalObservable.create(1000)
      .subscribe(() => this.updateVisualization(this.timestampOfMostRecentObservation));
  }

  public destroy() {
    this.updateScheduler.unsubscribe();
    this.visualizer.destroy();
  }

  private updateVisualization(fromTimestamp: number) {
    const urlForNewObservations = this.widget.sensorMeasurementsUrl + `?from=${fromTimestamp.toString()}`;
    this.getSensorObservations(urlForNewObservations)
      .pipe(
        expand(({sensorObservations, nextUrl}) => { // https://ncjamieson.com/understanding-expand/
          if (sensorObservations && sensorObservations.length > 0 && nextUrl) {
            return this.getSensorObservations(nextUrl);
          } else {
            return empty();
          }
        }),
        concatMap(({sensorObservations}) => sensorObservations),
      )
      .subscribe((sensorObservations: [number, number][]) => {
        const downsampledObservations = this.downsampleSensorObservations(sensorObservations);
        this.visualizer.update(downsampledObservations);
      });
  }

  private getSensorObservations(urlForNewObservations: string): Observable<{
    sensorObservations: object[],
    nextUrl: string | null
  }> {
    return this.sensorGateway.getApiResource(urlForNewObservations).pipe(
      map(response => {
        if ('values' in response.body) {
          return {
            sensorObservations: response.body['values'].map(observation => [
              observation[0],
              observation[1]
            ]),
            nextUrl: this.getNextLinkUrl(response)
          };
        } else {
          return null;
        }
      })
    );
  }

  private getNextLinkUrl(response: HttpResponse<object>): string | null {
    if (response.headers.has('link')) {
      const linkHeader = response.headers.get('link');
      /* Example of a linkHeader:
       *'</sensors/1/properties/1/observations/20180711/12/19?from=1531311594456>; rel="self",
       * </sensors/1/properties/1/observations/20180711/12/18>; rel="prev",
       * </sensors/1/properties/1/observations/20180711/12/20>; rel="next"' */
      const links = linkHeader.split(',');
      const nextLink = links.find(link => link.endsWith('; rel="next"'));
      if (nextLink) {
        return nextLink.substring(nextLink.indexOf('<') + 1, nextLink.indexOf('>'));
      } else {
        return null;
      }
    }
  }
}

标签: angulartypescriptrxjs

解决方案


我不会使用订阅一个 observable 来触发另一个,而是将问题抛在脑后,并创建一个单独的 observable 来满足您的需求。

我会在你的初始化方法中提出这样的建议:

let fromTimestamp = Date.now() - 121 * 1000;
// Create a base observable, doesn't really matter what it is
this.subscription = of(true).pipe(
    // Map to the right call fur the current time
    flatMap(() => {
        const urlForNewObservations = this.widget.sensorMeasurementsUrl + `?from=${fromTimestamp.toString()}`;
        return this.getSensorObservations(urlForNewObservations);
    }),

    // Repeat the REST call while the sensor returns a next URL:
    expand(({sensorObservations, nextUrl}) => { // https://ncjamieson.com/understanding-expand/

      if (sensorObservations && sensorObservations.length > 0 && nextUrl) {
        // Set the fromTimestamp for the next set of observations.
        fromTimestamp = this.parseTimestamp(nextUrl, fromTimestamp);
        return this.getSensorObservations(nextUrl);
      } else {
        return empty();
      }
    }),
    concatMap(({sensorObservations}) => sensorObservations),

    // Keep repeating this
    repeat(),
    
    // But wait a second between each one
    delay(1000),        

    // Terminate the whole thing when the service is destroyed / stopped.              
    takeWhile(() => !this.destroyed)  
).subscribe((sensorObservations: [number, number][]) => {
    const downsampledObservations = this.downsampleSensorObservations(sensorObservations);
    this.visualizer.update(downsampledObservations);
});

您需要实现 parseTimestamp 以解析来自 URL 或类似内容的相关下一个时间戳。

然后实现 ngOnDestroy 将 this.destroyed 设置为 true 和 do if (this.subscription) this.subscription.unsubscribe();,这将在服务被销毁时终止订阅 - 如果您还想手动控制它,请在您的 destroy 方法中手动将其设置为 true/unsubscribe 。


推荐阅读