首页 > 解决方案 > 仅当 observable 未完成时取消订阅

问题描述

我有一个 observable,我想在订阅者取消订阅时调用取消(拆卸)逻辑,但前提是源 observable 本身尚未完成(或失败)。

内置finalize运算符允许在unsubscribe发生时注册自定义回调,但只要取消订阅是由订阅者或源 observable 的完成引起的,它就会被调用。

我实现了这个辅助函数:

function withCancellation(source, onCancel) {
  return new Observable(subscriber => {
    let completed = false;

    const cancellable = source.pipe(
      tap({
        error: () => { completed = true; },
        complete: () => { completed = true; },
      })
    );

    const subscription = cancellable.subscribe(subscriber);
    subscription.add(() => { if (!completed) onCancel(); });

    return subscription;
  });
}

我可以使用以下方式:

const sourceStream = startJob(jobId); // returns source observable
const cancellableStream = withCancellation(sourceStream, () => stopJob(jobId));

有没有更简洁的方法可以使用任何内置原语来实现相同的目标?

标签: rxjsrxjs6

解决方案


推荐阅读