java - 如何在现有的 Observable 上发布新的 CompletableFuture 结果?
问题描述
我会尽量简化代码和我遇到的问题,但这似乎有点过于复杂,所以我也愿意接受完全重构代码的解决方案。
简而言之,代码应该连续地流式传输数据。我拥有的是两个合并的 Observable,一个从 REST 请求中获取数据,一个从另一个来源获取数据。第一个 Observable 通过 CompletableFuture 获取数据,以防我们更快地从其他来源获取数据。实施此机制是为了尽可能快地获取数据,但避免发出不必要的 REST 请求并达到 API 限制。
通过说明一些代码“更容易”理解:
public Disposable myMethod() {
CompletableFuture restCallFuture = makeRequest();
Observable restObservable = Observable.fromPublisher(s -> restCallFuture.thenAccept( obj -> s.onNext(obj))
.exceptionally(throwable -> {
if (throwable.getCause() instanceof CancellationException) {
//we are good
});
Observable otherObservable = getDataContinuouslyFromOtherSource();
return Observable.merge(restObservable, otherObservable)
.subscribe(obj -> {
restCallFuture.cancel(true);
doWhatever(obj);
}
}
问题是,有时让我们说互联网连接断开,当它再次启动时,我会收到通知。在此通知上,我想重复 REST 请求而不重新启动应用程序,并以某种方式在同一个已返回的合并 Observable 实例上发布数据。事实上,我想重复整个请求取消机制,但不重新运行代码,只通过函数调用。
我尝试在不丢失引用的情况下重新实例化 CompletableFuture,但它不起作用。我试图阅读如何以某种方式打破这一行:
Observable.fromPublisher(s -> restCallFuture.thenAccept( obj -> s.onNext(obj)));
并使用更合适但没有解决任何问题的东西。
编辑:根据 akarnokd 的建议,我正在通过以下方式创建 restObservable:
Observable restObservable = ObservableInterop.fromFuture(restCallFuture);
我仍然不知道如何让它重做 REST 调用。
解决方案
推荐阅读
- scala - 从 SFTP 服务器递归下载目录内容
- regex - awk - 分配给变量的正则表达式 - 损坏
- ssis - 更新到 Visual Studio 2017 SSDT 后在 Visual Studio 2017 中使用脚本任务编辑器不再工作
- ruby - Ruby - 使用某些函数进行常量初始化会产生 NoMethodError
- c# - Xamarin.Essentials Share.RequestAsync() 在 iOS 上不起作用
- bash - Git哈希输出与sha1sum不匹配,为什么?
- angular - Angular Modal Gallery - 位置画廊图像
- c# - 如何在表达式中使用委托
- python - 从 Flutter App 接收 Python Cloud Function 中的变量
- c++ - 如何知道 std::exception 的异常类型