javascript - 添加新元素并且它是第一个元素时,RXJS 启动缓冲区
问题描述
下面有更新
我正在尝试创建一个 id 字符串的缓冲区。我希望这个缓冲区关闭两个谓词:
- 在我决定的一段时间后超时
- 它达到我决定的最大缓冲区大小
一旦缓冲区在这两个谓词中的任何一个上关闭,我希望它将它收集的 id 字符串数组传递给另一个函数。我下面的代码是我想要传递数据的确切设置,但我是 RXJS 的新手!
class TestClass {
private ids: Array<string>;
private id$: Subject<string> = new Subject();
private buffered$: Observable<Array<string>>;
private timer$: Observable<number>;
constructor() {
this.ids = [];
this.id$ = new Subject<string>();
this.id$.subscribe();
this.timer$ = interval(2000); // timeoutTimer
this.buffered$ = this.id$.pipe(buffer(this.timer$));
this.buffered$.subscribe((idBuffer) => console.log("Id buffer: " + idBuffer));
}
fun1(id: string, x: number) {
this.ids.push(id);
this.id$.next(id); // Source data to be Observed and buffered
return new Promise((reslove) => {
// I want to call fun2 here with
// 1. array from buffer Observable
// 2. the x value from this function
resolve(fun2(buffered, x))
});
}
fun2(ids: Array<string>, x: number) {
console.log('In function 2!');
}
}
let test = new TestClass();
setInterval(() => {
test.fun1('hey' + Math.floor(Math.random() * 10), Math.random());
}, 300);
更新:
我更新了伪代码,描述了我面临的问题以及如何解决它。
问题:本质上,我多次调用 fun1 以将内容添加到缓冲区中,并且一旦达到我指定的时间限制或最大缓冲区大小,我想调用 fun2!然后我希望 fun1 返回 fun2 返回的相应数据对象。所以我在每次调用 fun1 时都在批处理我的调用而不是调用 fun2,我想等待缓冲区或超时并立即返回所有数据!
我需要找到一种方法,只在第一次调用 fun1 时订阅它,然后等待它完成返回所有元素,然后一旦缓冲区为空,再次订阅下一个要添加到缓冲区中的第一个元素
/*
* PROBLEM: I want this to run only for the first addition of an ID string to a new buffer
* but currently it is returning the new Promise<>... after every call to fun1! This results in
* repeating elements being returned from the Promise
*
* Example:
*
* Buffer = ["id1", "id2", "id3", "id4", "id5"] // We called fun1 five times with these respective IDs
*
* // We are assuming the buffers max out at 3 elements
* (1) idBuffer = ["id1", "id2", "id3"] // After first emission of bufferedIdAccumulator$
* (2) idBuffer = ["id2", "id3", "id4"] // After second emission of bufferedIdAccumulator$
* (3) idBuffer = ["id3", "id4", "id5"] // After third emission of bufferedIdAccumulator$
*
* What I Want:
* (1) idBuffer = ["id1", "id2", "id3"] // After first emission of bufferedIdAccumulator$
* (2) idBuffer = ["id1", "id2", "id3"] // After second emission of bufferedIdAccumulator$
* (3) idBuffer = ["id1", "id2", "id3"] // After third emission of bufferedIdAccumulator$
*
* This is becasuse the first three calls to fun1 contain the first three objects I need to return so I
* only want to return once for all three of these objects
*
*/
fun1(id: string, x: number): Promise<DataClass> {
// console.log("In Function 1!");
this.ids.push(id);
this.id$.next(id); // Source data to be Observed and buffered
console.log(`Adding id: ${id} to buffer!`);
return new Promise<DataClass>(async (resolve) => {
// I want to call fun2 here with
// 1. array from buffer Observable
// 2. the x value from this function
let idBuffer = await firstValueFrom(this.bufferedIdAccumulator$);
// let idBuffer = await new Promise<Array<string>>((resolve) => {
// this.bufferedIdAccumulator$.subscribe((idBuffer) => {
// console.log(idBuffer);
// resolve(idBuffer);
// });
// });
console.log(`<fun1> idBuffer: ${idBuffer}`);
let dataClasses = this.fun2(idBuffer, x);
let dataClass = dataClasses[0];
resolve(dataClass);
});
}
fun2(ids: Array<string>, x: number): Array<DataClass> {
// console.log('In function 2!');
// console.log('\nFunction 2:');
// console.log(ids);
let dataClasses = [];
ids.forEach((id) => {
dataClasses.push(new DataClass(id, x));
});
return dataClasses;
}
}
class DataClass {
id: string;
x: number;
constructor(id: string, x: number) {
this.id = id;
this.x = x;
}
}
let test = new TestClass();
let x = 0;
setInterval(() => {
test.fun1('hey' + x++, Math.random()); // Math.floor(Math.random() * 10), Math.random()
}, 2500);
解决方案
我想我找到了一个可行的解决方案!如果有人想看看,我将在这里发布我的示例代码!
import {
Observable,
Subject,
firstValueFrom,
Subscription,
} from 'rxjs';
import {
bufferCount,
filter,
take,
bufferTime,
raceWith,
} from 'rxjs/operators';
class TestClass {
private timeoutTime = 7000; // ms
private maxBatchSize = 5; // ids
private ids: Array<string>;
private id$: Subject<string> = new Subject();
private bufferedTime$: Observable<Array<string>>;
private bufferedCount$: Observable<Array<string>>;
private bufferedIdAccumulator$: Observable<Array<string>>;
private idBufferSubscription$: Subscription;
private firstBufferFromAccumulator: Promise<any>;
constructor() {
this.ids = [];
// Subject to observe
this.id$ = new Subject<string>();
this.id$.subscribe();
this.bufferedTime$ = this.id$.pipe(bufferTime(this.timeoutTime));
this.bufferedCount$ = this.id$.pipe(
bufferCount(this.maxBatchSize),
filter((arr) => !!arr.length) // don't emit empty array
);
// returns the emission of the Observable that emitted firstChild
this.bufferedIdAccumulator$ = this.bufferedCount$.pipe(
raceWith(this.bufferedTime$),
take(1) // takes 1 of the buffer
);
this.idBufferSubscription$ = null;
}
fun1(id: string, x: number): Promise<DataClass> {
// console.log('In Function 1!');
// Check if this.idBufferSubscription$ is null or a closed subscription
if (this.idBufferSubscription$ === null || this.idBufferSubscription$.closed) {
console.log("idBufferSubscription$ null or closed!");
this.idBufferSubscription$ = this.bufferedIdAccumulator$.subscribe();
this.firstBufferFromAccumulator = firstValueFrom(this.bufferedIdAccumulator$);
}
this.ids.push(id);
this.id$.next(id); // Source data to be Observed and buffered
console.log(`Adding id: ${id} to buffer!`);
return new Promise<DataClass>((resolve) => {
// I want to call fun2 here with
// 1. array from buffer Observable
// 2. the x value from this function
this.firstBufferFromAccumulator.then((idBuffer) => {
console.log(`<fun1>(2) idBuffer: ${idBuffer}`);
let dataClasses = this.fun2(idBuffer, x);
let dataClass = dataClasses[0];
resolve(dataClass);
});
});
}
fun2(ids: Array<string>, x: number): Array<DataClass> {
let dataClasses = [];
ids.forEach((id) => {
dataClasses.push(new DataClass(id, x));
});
return dataClasses;
}
}
class DataClass {
id: string;
x: number;
constructor(id: string, x: number) {
this.id = id;
this.x = x;
}
}
let test = new TestClass();
let x = 0;
setInterval(() => {
test.fun1('hey' + x++, Math.random()); // Math.floor(Math.random() * 10), Math.random()
}, 2500);
仅当它是添加到缓冲区中的第一个元素时才会订阅观察者,并且我有另一个变量 firstBufferFromAccumulator 将返回从 bufferedIdAccumulator$ 返回的字符串 [] 的承诺。我将它存储为一个变量,以便稍后在 fun1 中的 Promise 中返回它,以便它为缓冲区中对 fun1 的每次调用返回相同的字符串数组!希望这可以帮助!
推荐阅读
- javascript - cookie删除字符的问题
- vb.net - 摆脱/替换 ActiveX 函数
- visual-c++ - Visual C++ 编译器错误?
- mysql - SQL:查找重复记录,但只返回最新的重复记录?
- swift - 在 SwiftUI 中更新树结构的 UI
- google-colaboratory - Google Colab:您的会话因未知原因而崩溃
- javascript - 通过特定语言动态提交 - JavaScript - React Hooks
- linux - 如何获取 gdb 调用堆栈跟踪?
- react-native - 失去理智 - 为什么我的 fetch 不重新渲染(redux)
- ruby-on-rails - 如何调查 Rails 中的异常?