首页 > 解决方案 > 使用响应式扩展定期保存更​​改

问题描述

我有一个 Angular 应用程序,它有一个非常大的mat-table. 该表的大部分单元格都是mat-select组件(来自Angular Material),当其中的值发生变化时mat-select,需要保存,但要遵循以下规则:

  1. 每 7 秒保存一次自上次保存以来所做的所有更改。
  2. 如果一行被认为是完整的,则在那个时刻触发保存,其中包含自上次保存以来所做的所有更改。

我的进攻计划是这样的:

  1. 在任何selectionChange任何一个上mat-select,将整个数据对象发射到一个Subject.
  2. 订阅Subject它,通过以下管道运行它:
    • 根据上述条件缓冲可观察流
    • 过滤掉空数组(缓冲区窗口中没有发出任何内容)
    • 将整个更改数组发布到服务器。服务器知道如何处理它。
  3. 缓冲区条件本身就是一个 Observable,只要它发出任何东西就会触发缓冲区。
    • 我希望它在 7 秒过去或数据行完成时发出。所以我使用combineLatest组合两个 Observables,一个interval(7000)Subjectof 来保存数据,过滤以仅获取完整的数据;并且只取其中任何一个发出的最后一个。

这是我所做的(简化):

我的.component.html

  <mat-table>
  <ng-container matColumnDef='someData'>
    <mat-header-cell *matHeaderCellDef>Some Data</mat-header-cell>
    <mat-cell *matCellDef='let row'>
      <mat-form-field>
        <mat-select [(ngModel)]='row.someData' (selectionChange)='dataToSave.next(row)'>
          <mat-option [value]='3'>High</mat-option>
          <mat-option [value]='2'>Medium</mat-option>
          <mat-option [value]='1'>Low</mat-option>
        </mat-select>
      </mat-form-field>
    </mat-cell>
  </ng-container>
  ...
</mat-table>

我的.component.ts

import { Component, OnInit, AfterViewInit, OnDestroy } from '@angular/core';
import { DataService } from '../service/data.service';
import { Subject } from 'rxjs/Subject';
import { filter, switchMap, buffer, startWith } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
import { combineLatest } from 'rxjs/observable/combineLatest';

@Component({
  selector: 'app-data-grid',
  templateUrl: './data-grid.component.html',
  styleUrls: ['./data-grid.component.scss']
})
export class DataGridComponent implements OnInit, AfterViewInit, OnDestroy {
  dataToSave = new Subject<any>();

  // trigger the buffer either when 7 seconds has passed, or a row has been completed.
  bufferCondition = combineLatest(
    interval(7000).pipe(
      startWith(0)
    ),
    this.dataToSave.pipe(
      startWith({}),
      filter(row => this.checkAllFieldsCompleted(row))
    )
  );

  constructor(private dataService: DataService) { }

  ngAfterViewInit() {
    this.dataToSave.pipe(
      buffer(this.bufferCondition),
      filter(dataArray => data && dataArray.length > 0),
      switchMap(dataArray => this.dataService.saveData(dataArray))
    ).subscribe(dataSaved => { 
      /* success */ 
    }, err => { 
      /* error */
    });
  }

  checkAllFieldsCompleted(row: any): boolean {
    return /* logic to check if complete */
  }
    ...
}

我认为这将是反应式扩展的完美使用!但我似乎无法让它发挥作用。如果我bufferTime单独使用运算符,一切正常。所以我相信错误在于我如何定义我的自定义缓冲区条件。

标签: angularrxjsreactivex

解决方案


经过一些挖掘和大量试验和错误,我有了我的解决方案。

这个网站我了解到

请注意,combineLatest 在每个 observable 发出至少一个值之前,它不会发出初始值

在我的缓冲条件下,我试图结合两个 observables:7 秒间隔,以及当一行完成时。我注意到只有我第一次完成一行之后才会开始预期的行为 - 只有这样它才会开始每 7 秒保存一次。问题出在observable上的filter操作员。直到两个observables 都发出了一些东西,才发出任何东西。dataToSavecombineLatest

我改变了这段代码:

bufferCondition = combineLatest(
  interval(7000).pipe(
    startWith(0)
  ),
  this.dataToSave.pipe(
    startWith({}),
    filter(row => this.checkAllFieldsCompleted(row))
  )
);

对此:

bufferCondition = combineLatest(
  interval(7000),
  this.dataToSave.pipe(
    filter((row, index) => index === 0 || this.checkAllFieldsCompleted(row))
  )
);

它工作得很好!基本上,我不需要startWith操作员,并且在 上filter,我让 observable 发出对数据的第一次更改,但在一行完成之前没有其他更改。


推荐阅读