java - 为什么 toMap 放置不当会导致 RXJava 出现问题?
问题描述
更新:我正在使用 RxJava 1.x
下面是一段代码:
private static void tryObservableToMap() {
bad();
good();
}
private static void good() {
System.out.println("GOOD CASE");
String goodOutput =
m(m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")
.distinct(), "distinct")
.flatMap(s ->
m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)
.map(intValue -> Pair.of(s, intValue)), "pair " + s)), "flatMap")
.toMap(Pair::getKey, Pair::getValue), "toMap")
.map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")
.toBlocking()
.first();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\nOutput:");
System.out.println(goodOutput);
}
private static void bad() {
System.out.println("BAD CASE");
String badOutput =
m(m(m(m(Observable.from(ImmutableList.of("a","b","c","d")), "list")
.distinct(), "distinct")
.flatMap(s ->
m(m(m(Observable.fromCallable(() -> getIntForString(s)).subscribeOn(Schedulers.io()), "getInt " + s)
.map(intValue -> Pair.of(s, intValue)), "pair " + s)
.toMap(Pair::getKey, Pair::getValue), "toMap " + s)), "flatMap")
.map(map -> map.entrySet().stream().map(e -> e.getKey() + ": " + e.getValue()).collect(Collectors.joining("\n"))), "OUTER")
.toBlocking()
.first();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\nOutput:");
System.out.println(badOutput);
}
private static <T> Observable<T> m(final Observable<T> observable, final String name) {
return observable
.doOnSubscribe(() -> logRxLifecycleEvent(name, "subscribe"))
.doOnError((ex) -> logRxLifecycleEvent(name, "error: " + ex.getMessage()))
.doOnCompleted(() -> logRxLifecycleEvent(name, "complete"))
.doOnTerminate(() -> logRxLifecycleEvent(name, "terminating"))
.doAfterTerminate(() -> logRxLifecycleEvent(name, "terminated"))
.doOnUnsubscribe(() -> logRxLifecycleEvent(name, "unsubscribe"));
}
private static void logRxLifecycleEvent(final String name, final String event) {
System.out.println("\tRXLOG " + name + " observable " + event);
}
private static int getIntForString(String s) {
switch(s) {
case "a":
return 1;
case "b":
return 2;
case "c":
return 3;
case "d":
return 4;
default:
return 0;
}
}
好与坏的区别在于,对于坏的版本,我是.toMap
在内部调用.flatMap
而不是在之后调用.flatMap
。
如果您运行此代码,您将看到作为执行一部分的所有可观察对象的不同事件。
我想知道为什么“外部”可观察对象永远不会因坏情况而终止。哪位对RX有更深入了解的可以解释一下吗?
解决方案
RXLOG OUTER observable complete
缺少是因为在它上面的源之间存在竞争,toBlocking().first()
它完成了。它可能会过早取消订阅,因此上面的源可能没有机会发出onCompleted
. 在我的 i7 4770K 上,他们从不completed
为我打印。如果您替换first
为toIterable().iterator().next()
,它提供了必要的机会,您应该一直看到丢失的日志。
推荐阅读
- python - 超越关键字依赖的文本分类并推断实际含义
- reactjs - 预检(cors)请求服务器将来源更改为 * 后,chrome 不显示请求(但我查看响应正文)。如何解决问题?
- sql - 希望拥有另一个数据库的副本,该数据库会自动更新并具有一些附加列
- firebase - 如何获取快照元数据 Firestore?
- typescript - 如何在 Typescript 中实例化 File 对象?
- sql - 从数组中删除一个非唯一值
- lambda - 返回 CompletableFuture
- > 来自 CompletableFuture
> - html - 在水平导航栏中将链接居中
- java - java8 ZonedDateTime 不打印 EST 当前时间。我在这里做错了什么
- c# - 是否可以在同一个 MVC 控制器类中使用 2 个不同的授权方案?