首页 > 解决方案 > 如何创建 Observable来自无限流使用 RxJava2?

问题描述

我正在尝试将 RxJava 从 1 升级到 2。在我的旧代码中,我有如下方法:

private Observable<Integer> reversRange(int from, int to) {
    Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
    return Observable.from(() -> intStream.iterator())
            .takeWhile(n -> n > from)
            .map(n -> n );
}

但现在在 RxJava 2 中我不能使用from. 这段代码相当于什么?我在这个问题中发现它是,fromIterable但我不知道如何将它与 Stream 一起使用。

或其他示例,这不仅适用于范围,还适用于任何无限流。

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return Observable.from(() -> intStream.iterator());
}

标签: javajava-streamrx-javarx-java2reactive-streams

解决方案


使用generate()功能:

这是 kotlin 代码(扩展函数),但您只需稍微更改 lambda。这适用于任何流。

fun <T> Stream<T>.toFlowable(): Flowable<T> {
  return Flowable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

如果你愿意,你也可以使用Observable,但我不明白你为什么应该这样做。

fun <T> Stream<T>.toObservable(): Observable<T> {
  return Observable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

我认为在java中会是这样的:

public <T> Observable<T> streamToObservable(Stream<T> stream) {
  return Observable.generate(
    () -> stream.iterator(),
    (ite, emitter) -> {
      if (ite.hasNext()) {
        emitter.onNext(ite.next());
      } else {
        emitter.onComplete();
      }
    }
  );
}

所以你的代码会变成:

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return streamToObservable(intStream);
}

推荐阅读