首页 > 解决方案 > 添加新元素并且它是第一个元素时,RXJS 启动缓冲区

问题描述

下面有更新

我正在尝试创建一个 id 字符串的缓冲区。我希望这个缓冲区关闭两个谓词:

  1. 在我决定的一段时间后超时
  2. 它达到我决定的最大缓冲区大小

一旦缓冲区在这两个谓词中的任何一个上关闭,我希望它将它收集的 id 字符串数组传递给另一个函数。我下面的代码是我想要传递数据的确切设置,但我是 RXJS 的新手!

class TestClass {
  private ids: Array<string>;

  private id$: Subject<string> = new Subject();
  private buffered$: Observable<Array<string>>;
  private timer$: Observable<number>;

  constructor() {
    this.ids = [];

    this.id$ = new Subject<string>();
    this.id$.subscribe(); 

    this.timer$ = interval(2000); // timeoutTimer

    this.buffered$ = this.id$.pipe(buffer(this.timer$));
    this.buffered$.subscribe((idBuffer) => console.log("Id buffer: " + idBuffer));
  }

  fun1(id: string, x: number) {
    this.ids.push(id);
    this.id$.next(id); // Source data to be Observed and buffered 

    return new Promise((reslove) => {
      // I want to call fun2 here with 
      //  1. array from buffer Observable
      //  2. the x value from this function
      resolve(fun2(buffered, x))
    }); 
  }

  fun2(ids: Array<string>, x: number) {
    console.log('In function 2!');
  }
}

let test = new TestClass();

setInterval(() => {
  test.fun1('hey' + Math.floor(Math.random() * 10), Math.random());
}, 300);

更新:

我更新了伪代码,描述了我面临的问题以及如何解决它。

问题:本质上,我多次调用 fun1 以将内容添加到缓冲区中,并且一旦达到我指定的时间限制或最大缓冲区大小,我想调用 fun2!然后我希望 fun1 返回 fun2 返回的相应数据对象。所以我在每次调用 fun1 时都在批处理我的调用而不是调用 fun2,我想等待缓冲区或超时并立即返回所有数据!

我需要找到一种方法,只在第一次调用 fun1 时订阅它,然后等待它完成返回所有元素,然后一旦缓冲区为空,再次订阅下一个要添加到缓冲区中的第一个元素

/*
   * PROBLEM: I want this to run only for the first addition of an ID string to a new buffer
   * but currently it is returning the new Promise<>... after every call to fun1! This results in
   * repeating elements being returned from the Promise
   *
   * Example:
   *
   * Buffer = ["id1", "id2", "id3", "id4", "id5"] // We called fun1 five times with these respective IDs
   *
   * // We are assuming the buffers max out at 3 elements
   * (1) idBuffer = ["id1", "id2", "id3"] // After first emission of bufferedIdAccumulator$
   * (2) idBuffer = ["id2", "id3", "id4"] // After second emission of bufferedIdAccumulator$
   * (3) idBuffer = ["id3", "id4", "id5"] // After third emission of bufferedIdAccumulator$
   *
   * What I Want:
   * (1) idBuffer = ["id1", "id2", "id3"] // After first emission of bufferedIdAccumulator$
   * (2) idBuffer = ["id1", "id2", "id3"] // After second emission of bufferedIdAccumulator$
   * (3) idBuffer = ["id1", "id2", "id3"] // After third emission of bufferedIdAccumulator$
   *
   * This is becasuse the first three calls to fun1 contain the first three objects I need to return so I
   * only want to return once for all three of these objects
   *
   */
  fun1(id: string, x: number): Promise<DataClass> {
    // console.log("In Function 1!");

    this.ids.push(id);
    this.id$.next(id); // Source data to be Observed and buffered

    console.log(`Adding id: ${id} to buffer!`);

    return new Promise<DataClass>(async (resolve) => {
      // I want to call fun2 here with
      //  1. array from buffer Observable
      //  2. the x value from this function
      let idBuffer = await firstValueFrom(this.bufferedIdAccumulator$);
      // let idBuffer = await new Promise<Array<string>>((resolve) => {
      //   this.bufferedIdAccumulator$.subscribe((idBuffer) => {
      //     console.log(idBuffer);
      //     resolve(idBuffer);
      //   });
      // });
      console.log(`<fun1> idBuffer: ${idBuffer}`);
      let dataClasses = this.fun2(idBuffer, x);
      let dataClass = dataClasses[0];
      resolve(dataClass);
    });
  }

  fun2(ids: Array<string>, x: number): Array<DataClass> {
    // console.log('In function 2!');
    // console.log('\nFunction 2:');
    // console.log(ids);

    let dataClasses = [];
    ids.forEach((id) => {
      dataClasses.push(new DataClass(id, x));
    });
    return dataClasses;
  }
}

class DataClass {
  id: string;
  x: number;

  constructor(id: string, x: number) {
    this.id = id;
    this.x = x;
  }
}

let test = new TestClass();

let x = 0;
setInterval(() => {
  test.fun1('hey' + x++, Math.random()); // Math.floor(Math.random() * 10), Math.random()
}, 2500);

标签: javascriptrxjs

解决方案


我想我找到了一个可行的解决方案!如果有人想看看,我将在这里发布我的示例代码!

import {
  Observable,
  Subject,
  firstValueFrom,
  Subscription,
} from 'rxjs';
import {
  bufferCount,
  filter,
  take,
  bufferTime,
  raceWith,
} from 'rxjs/operators';

class TestClass {
  private timeoutTime = 7000; // ms
  private maxBatchSize = 5; // ids

  private ids: Array<string>;

  private id$: Subject<string> = new Subject();
  private bufferedTime$: Observable<Array<string>>;
  private bufferedCount$: Observable<Array<string>>;
  private bufferedIdAccumulator$: Observable<Array<string>>;
  private idBufferSubscription$: Subscription;
  private firstBufferFromAccumulator: Promise<any>;

  constructor() {
    this.ids = [];

    // Subject to observe
    this.id$ = new Subject<string>();
    this.id$.subscribe();

    this.bufferedTime$ = this.id$.pipe(bufferTime(this.timeoutTime));
    this.bufferedCount$ = this.id$.pipe(
      bufferCount(this.maxBatchSize),
      filter((arr) => !!arr.length) // don't emit empty array
    );

    // returns the emission of the Observable that emitted firstChild
    this.bufferedIdAccumulator$ = this.bufferedCount$.pipe(
      raceWith(this.bufferedTime$),
      take(1) // takes 1 of the buffer
    );

    this.idBufferSubscription$ = null;
  }

  fun1(id: string, x: number): Promise<DataClass> {
    // console.log('In Function 1!');

    // Check if this.idBufferSubscription$ is null or a closed subscription
    if (this.idBufferSubscription$ === null || this.idBufferSubscription$.closed) {
      console.log("idBufferSubscription$ null or closed!");
      this.idBufferSubscription$ = this.bufferedIdAccumulator$.subscribe();
      this.firstBufferFromAccumulator = firstValueFrom(this.bufferedIdAccumulator$);
    }

    this.ids.push(id);
    this.id$.next(id); // Source data to be Observed and buffered

    console.log(`Adding id: ${id} to buffer!`);

    return new Promise<DataClass>((resolve) => {
      // I want to call fun2 here with
      //  1. array from buffer Observable
      //  2. the x value from this function
      this.firstBufferFromAccumulator.then((idBuffer) => {
        console.log(`<fun1>(2) idBuffer: ${idBuffer}`);
        let dataClasses = this.fun2(idBuffer, x);
        let dataClass = dataClasses[0];
        resolve(dataClass);
      });
    });
  }

  fun2(ids: Array<string>, x: number): Array<DataClass> {
    let dataClasses = [];
    ids.forEach((id) => {
      dataClasses.push(new DataClass(id, x));
    });
    return dataClasses;
  }
}

class DataClass {
  id: string;
  x: number;

  constructor(id: string, x: number) {
    this.id = id;
    this.x = x;
  }
}

let test = new TestClass();

let x = 0;
setInterval(() => {
  test.fun1('hey' + x++, Math.random()); // Math.floor(Math.random() * 10), Math.random()
}, 2500);

仅当它是添加到缓冲区中的第一个元素时才会订阅观察者,并且我有另一个变量 firstBufferFromAccumulator 将返回从 bufferedIdAccumulator$ 返回的字符串 [] 的承诺。我将它存储为一个变量,以便稍后在 fun1 中的 Promise 中返回它,以便它为缓冲区中对 fun1 的每次调用返回相同的字符串数组!希望这可以帮助!


推荐阅读