首页 > 解决方案 > 如何在现有的 Observable 上发布新的 CompletableFuture 结果?

问题描述

我会尽量简化代码和我遇到的问题,但这似乎有点过于复杂,所以我也愿意接受完全重构代码的解决方案。

简而言之,代码应该连续地流式传输数据。我拥有的是两个合并的 Observable,一个从 REST 请求中获取数据,一个从另一个来源获取数据。第一个 Observable 通过 CompletableFuture 获取数据,以防我们更快地从其他来源获取数据。实施此机制是为了尽可能快地获取数据,但避免发出不必要的 REST 请求并达到 API 限制。

通过说明一些代码“更容易”理解:

    public Disposable myMethod() {
        CompletableFuture restCallFuture = makeRequest();

        Observable restObservable = Observable.fromPublisher(s -> restCallFuture.thenAccept( obj -> s.onNext(obj))
            .exceptionally(throwable -> {
                if (throwable.getCause() instanceof CancellationException) {
                    //we are good
                });

        Observable otherObservable = getDataContinuouslyFromOtherSource();

        return Observable.merge(restObservable, otherObservable)
                .subscribe(obj -> {
                    restCallFuture.cancel(true);
                    doWhatever(obj);
                }
    }

问题是,有时让我们说互联网连接断开,当它再次启动时,我会收到通知。在此通知上,我想重复 REST 请求而不重新启动应用程序,并以某种方式在同一个已返回的合并 Observable 实例上发布数据。事实上,我想重复整个请求取消机制,但不重新运行代码,只通过函数调用。

我尝试在不丢失引用的情况下重新实例化 CompletableFuture,但它不起作用。我试图阅读如何以某种方式打破这一行:

Observable.fromPublisher(s -> restCallFuture.thenAccept( obj -> s.onNext(obj)));

并使用更合适但没有解决任何问题的东西。

编辑:根据 akarnokd 的建议,我正在通过以下方式创建 restObservable:

Observable restObservable = ObservableInterop.fromFuture(restCallFuture);

我仍然不知道如何让它重做 REST 调用。

标签: javarequestobservablerx-javacompletable-future

解决方案


推荐阅读