首页 > 解决方案 > RxJS 自定义运算符不调用完成

问题描述

我有以下自定义运算符:

export function test() {
  return function(source) {
    return new Observable(observer => {

      return source.subscribe({
        next(value) {
          observer.next(value);
        },
        error(error) {
          observer.error(error);
        },
        complete() {
          console.log('completed')
          observer.complete();
        }
      })
    });
  }
}

问题是当我订阅一个完成的可观察对象时,我没有得到完整的回调,即我没有看到完成的日志。

interval(1000).pipe(
  test(),
  take(2)
).subscribe();

我错过了什么?

标签: javascriptrxjs

解决方案


好问题,现在让我们深入解释一下。

首先,让我们看看解决方案

function test(limitter) {
  return function(source) {
    return new Observable(observer => {
      return source.subscribe({
        next(value) {
          observer.next(value);
        },
        error(error) {
          observer.error(error);
        },
        complete() {
          console.log("completed action");
          observer.complete("completed value");
        }
      });
    });
  };
}

interval(1000)
  .pipe(
    take(2),
    test()
  )
  .subscribe(
    x => {
      console.log("result: ", x);
    },
    err => {
      console.log(err);
    },
    end => {
      console.log("Observable has been completed");
    }
  );

那么有什么区别,在这个片段中,take操作符在自定义test()操作符之前,这意味着每当我们达到所需的计数(在我们的例子中2),take操作符将返回完成的源代码,这将触发我们在后续订阅者中的完成方法(在我们的例子中,在自定义test操作符内部以及subscribe) 之后,source不会发出任何其他内容,因为它已经完成。

您可以查看源代码 => take(),如果有一些模糊的部分,请随时询问更多信息。


推荐阅读