首页 > 解决方案 > 停止 Rxjs 观察者的下一次执行?

问题描述

我正在尝试使用 Rxjs 观察者的基本用例。这就是我正在做的事情:

const { Observable } = require('rxjs');

Observable.create(o => { 
  setTimeout(() => o.next('hi'), 1000); 
  setTimeout(() => { throw new Error('A mistake') }, 1500); 
  setTimeout(() => o.next('this should not print'), 2000) }
).subscribe({
  next: x => console.log(x),
  error: y => console.log('error', y),
  complete: () => console.log('done')
});

输出如下:

> hi
Error: A mistake
    at Timeout.setTimeout [as _onTimeout] (repl:1:89)
    at ontimeout (timers.js:498:11)
    at tryOnTimeout (timers.js:323:5)
    at Timer.listOnTimeout (timers.js:290:5)
> this should not print

我需要做的是停止下一次的执行,即使unsubscribe我订阅了它也不会停止下一次执行。

我也试过这样:

让订阅;

let source = Observable.create(o => { 
  try {
    setTimeout(() => o.next('hi'), 1000); 
    setTimeout(() => { throw new Error('A mistake') }, 1500); 
    setTimeout(() => o.next('this should not print'), 2000) 
  } catch (e) {
    subscription.unsubscribe();
    return 'error';
  }
});
subscription = source.subscribe({
  next: x => console.log(x),
  error: y => console.log('error', y),
  complete: () => console.log('done')
});

但没有机会……它没有停止。

我所拥有的代码不仅仅是设置超时我有异步和等待这样的代码:

let subscription;

let source = Observable.create(async o => { 
  try {
    o.next(await anEvent()); 
    o.next(await anEventThatThrowsAnException()); 
    o.next(await anEventThatIWantToAvoidDueToTheException()); 
  } catch (e) {
    subscription.unsubscribe();
    return 'error';
  }
});
subscription = source.subscribe({
  next: x => console.log(x),
  error: y => console.log('error', y),
  complete: () => console.log('done')
});

我怎样才能实现这个代码来停止“这不应该打印”?

标签: javascriptnode.jsrxjsrxjs6

解决方案


用 设置计时器后setTimeout,您将不会阻止它的执行unsubscribe

您将需要手动清除超时。

请注意,您可以将超时保存在变量中,var myTimeout = setTimeout(f,ms);然后您可以取消clearTimeout(myTimeout);

阅读更多:


推荐阅读