首页 > 解决方案 > RxJS 等待订阅 Observable 完成

问题描述

我正在尝试使用 NestJS 实现 gRPC 并使用 RxJS Observerable 来处理服务器流。

在下面的代码中,我尝试将值放入observable数组results。该函数在正确打印所有结果findAllRepos 返回空列表console.log(value)subscribe

  findAllRepos(): Repository[] {
    const results: Repository[] = [];

    const observable = this.mainService.findAllRepos({});
    observable.subscribe({
      next: (value) => {
        console.log(value);
        results.push(value);
      },
      error: (err) => console.log(err),
      complete: () => console.log(results)
    });

    return results;
  }

我认为问题在于函数在subscribe完成之前返回值。有什么解决方案可以解决这个问题吗?谢谢!

标签: typescriptrxjsnestjsgrpc-node

解决方案


问题

我认为问题在于该函数在订阅完成之前返回值。

函数 ( next, error, & ) 都会在未来某个时间complete由可观察对象(而不是您)调用。当您深入了解 RxJS 的核心内容时,您可以对这些函数的调用方式进行一些控制,但为了简单起见,最好想象一下这是由 RxJS 库不透明地处理的。

因此,没有办法做你想做的事。

问题甚至比这更糟。您遇到的问题存在于同步和异步代码之间的任何交互。您不能同步运行异步代码。在单线程环境(如 JavaScript)中,试图让一段同步的代码等待会立即使整个程序死锁。

考虑以下代码:您可能希望它在 1000 毫秒内输出“a 等于 0”,然后开始永远输出“a 等于 1” 。然而,实际发生的是,内部的代码setTimeout永远不会有机会运行,因为线程将陷入无限循环,打印“a 等于 0”

// Start with a = 0
let a = 0;
// After 1 second, set a = 1
setTimeout(() => a = 1, 1000);

// loop forever and print something based on value of a
while(true){ 
  if(a == 0) console.log("a equals 0");
  else console.log("a equals 1");
}

解决方案

管理异步代码的两种最流行的方法是通过 promises 或 observables。如果你想让一个函数异步返回一些东西,那么让它返回一个 promise 或 observable。

在你的情况下:

findAllRepos(): Observable<Repository[]> {
  
  const observable = this.mainService.findAllRepos({});
  return observable.pipe(
    tap(value => console.log("Repository value being added to array: ", value)),
    toArray(),
    tap({
      next: value => console.log("Result Array (Repository[]) : ", value),
      error: console.log,
      complete: () => console.log("findAllRepos observable complete")
    })
  );

}

然后在其他地方获取实际值:

findAllRepos().subscribe(repositories => {
  /* Do something with your array of repositories */
});

推荐阅读