angular - Rxjs:使用 scan 或 mergeMap 或任何 rxjs 在 X 秒后将 observables 数据流(grpc 服务)组合成一个数组
问题描述
我在 2 秒内收到一个数据流作为来自服务(grpc)的响应,比如 5 个流。每个流都是我订阅的可观察对象。我收到每个流后的处理逻辑更多,每个流都有一些重的 json 对象,其中包含类似 base64 编码的字节字符串,涉及 json 到复杂类型的转换和更多的条件逻辑。因此,当一个流的处理逻辑完成时,我无法验证其他流,因为它的处理速度非常快(像并发一样接近)并且被遗漏了。所以我需要在监听 X 秒后将所有这些 Y(例如 5)个流聚合到一个可观察的数组中。下面是代码片段。
MyTestService.ts:
@Injectable()
export class MyTestService {
client: GrpcMyTestServiceClient;
public readonly coreData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.coreData$ = this.listCoreStream();
}
listCoreStream(): Observable<TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
//Get stream data from service(grpc)
const stream = this.client.getCoreUpdates(req);
stream.on('data', (message: any) => {
obs.next(message.toObject() as TestReply.AsObject);
});
});
}
我的组件.ts
public testReply?: TestReply.AsObject;
private _subscription: Subscription;
constructor(private readonly _MyTestService: MyTestService) {
this._subscription = new Subscription();
}
ngOnInit(): void {
this._subscription = this._MyTestService.coreData$.subscribe((data) => {
if (data) {
let obj = JSON.parse(data);
//processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
}
});
}
由于数据太快,跨度太短,没有全部记录或处理。它看起来像同步问题,这在网络世界中是不可能的,满足某些条件的最后一个流会覆盖前一个流。为了避免这种情况并一次处理所有流,我需要将所有流组合/合并到一个数组中,以便我可以在订阅 observable 的组件中迭代它们中的每一个。流数据排序无关紧要。
我试图使用 rxjs 运算符,如timer
, mergemap
, concatmap
, scan
, merge
. 但是这些仍然是新手,无法找到正确的方法。下面是一个这样的尝试使用scan
但无法获得所需的结果,并且该数组具有空值,并且不确定如何从console.log
. 任何指针都会很有帮助,请提出建议。
解决方案尝试:
let temp: TestReply.AsObject[] = [];
let test = this._MyTestService.coreData$
.pipe(
mergeMap(_ => timer(5000)),
scan<any>((allResponses, currentResponse) =>
[...allResponses, currentResponse], this.temp),
).subscribe(console.log);
解决方案
这是我的解决方案,使用订阅中的下一个块将所有数组推送到一起,然后最终在完整块中执行操作。
MyTestService.ts:
@Injectable()
export class MyTestService {
client: GrpcMyTestServiceClient;
public readonly coreData$: Observable<TestReply.AsObject>;
constructor(private readonly http: HttpClient) {
this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
this.coreData$ = this.listCoreStream();
}
listCoreStream(): Observable<TestReply.AsObject> {
return new Observable(obs => {
const req = new SomeRequest();
//Get stream data from service(grpc)
const stream = this.client.getCoreUpdates(req);
stream.on('data', (message: any) => {
obs.next(message.toObject() as TestReply.AsObject);
});
});
stream.on('end', (message: any) => {
obs.complete();
});
});
}
我的组件.ts
public testReply?: TestReply.AsObject;
public dataArray = [];
private _subscription: Subscription;
constructor(private readonly _MyTestService: MyTestService) {
this._subscription = new Subscription();
}
ngOnInit(): void {
this._subscription = this._MyTestService.coreData$.subscribe({
next: (data) => {
if (data) {
this.dataArray.push(JSON.parse(data));
}
},
complete: () => {
// this.dataArray
// above will be your final data
// processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
})
}
推荐阅读
- javascript - React ssr:'您可能需要适当的加载器来处理此文件类型,当前没有配置加载器来处理此文件'
- elixir - 是否可以使用 ecto 在同一个实体/表上实现多对多关联
- json - 在 Tabulator 中获取对象而不是数组
- html - 有人可以告诉我服务在 Angular 9 中是如何工作的吗?
- c# - 使用 C# 从 Unity 中的静态方法调用 IEnumerator 的最佳方法
- image - PDFSharp.Xamarin.Forms:将图像从资源添加到 PDF
- javascript - 如何使用 slick js carousel 展示应用图像?
- spring - Spring Security Oauth2 removeAccessToken 不起作用
- javascript - 重定向后弹出窗口上的Javascript事件监听器
- c# - c# Windows 应用程序中窗体拖动时图形闪烁