java - 如何创建 Observable来自无限流使用 RxJava2?
问题描述
我正在尝试将 RxJava 从 1 升级到 2。在我的旧代码中,我有如下方法:
private Observable<Integer> reversRange(int from, int to) {
Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
return Observable.from(() -> intStream.iterator())
.takeWhile(n -> n > from)
.map(n -> n );
}
但现在在 RxJava 2 中我不能使用from
. 这段代码相当于什么?我在这个问题中发现它是,fromIterable
但我不知道如何将它与 Stream 一起使用。
或其他示例,这不仅适用于范围,还适用于任何无限流。
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return Observable.from(() -> intStream.iterator());
}
解决方案
使用generate()
功能:
这是 kotlin 代码(扩展函数),但您只需稍微更改 lambda。这适用于任何流。
fun <T> Stream<T>.toFlowable(): Flowable<T> {
return Flowable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
如果你愿意,你也可以使用Observable
,但我不明白你为什么应该这样做。
fun <T> Stream<T>.toObservable(): Observable<T> {
return Observable.generate(
Callable { iterator() },
BiConsumer { ite, emitter ->
if (ite.hasNext()) {
emitter.onNext(ite.next())
} else {
emitter.onComplete()
}
}
)
}
我认为在java中会是这样的:
public <T> Observable<T> streamToObservable(Stream<T> stream) {
return Observable.generate(
() -> stream.iterator(),
(ite, emitter) -> {
if (ite.hasNext()) {
emitter.onNext(ite.next());
} else {
emitter.onComplete();
}
}
);
}
所以你的代码会变成:
private Observable<Integer> numbers() {
Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
return streamToObservable(intStream);
}
推荐阅读
- windows - 如何挂钩 Windows x64、x86 上的任何 API 调用?
- recursion - FP-Growth 算法中的递归
- php - 作曲家没有做出好的动作
- node.js - 尝试将现有的 excel 文件发送到 UI,作为来自节点后端的响应
- javascript - 模态仅在多个按钮上显示最后一个按钮内容
- javascript - 如何使用 Node 和 node-rest-client 发出 Oauth2 REST 请求?
- python - 使用 Selenium 和 Python 在 Chrome 中同时登录多个帐户
- angular - 为什么我的异步函数可以在 Firefox 上运行,但在 Chrome 上却给我一个错误?
- c++ - Xcode 11 头文件问题
- ios - ViewController 交互式过渡平移手势识别器