首页 > 解决方案 > 如何在 rxjs 中执行“onErrorContinue”

问题描述

我正在尝试恢复与onErrorContinueJava 反应堆核心 Java 示例中相同的主流

Flux.range(1, 5)
  .flatMap(n -> (n == 3) ? Mono.error(new Throwable("StoppedError")) : Mono.just(n))
  .onErrorContinue((throwable, o) -> { System.out.println("error with " + o); })
  .subscribe(System.out::println, System.out::println, System.out::println)

// 1
// 2
// error with 3
// 4
// 5

我怎样才能在 RXJS 中做到这一点?谢谢

直到现在已经尝试过

Observable.range(1, 5)
  .flatMap(v => v == 3 ? Observable.throwError(new Error("Stopped")): Observable.of(v))
  .subscribe(...logs)

标签: rxjs

解决方案


您需要将catchError运算符放在内部可观察对象上。抛出错误并提供错误值,例如-1

import { of, throwError, range } from "rxjs";
import { map, flatMap, catchError } from "rxjs/operators";

range(1, 5)
  .pipe(
    flatMap(v =>
      (v == 3 ? throwError(new Error("Stopped")) : of(v)).pipe(
        catchError(err => of(-1))
      )
    )
  )
  .subscribe(console.log);

Stackblitz:https ://stackblitz.com/edit/rxjs-dnz4kx?devtoolsheight=60


推荐阅读