首页 > 解决方案 > 使用 Rx 调用两个链接的独立方法

问题描述

我有两个异步方法,必须在一个操作时调用。每个方法都可以成功完成或检索错误。如果出现错误,我必须再次重试调用每个方法,延迟 2 秒。意思是,我应该调用这两种方法,尽管其中一种方法有结果。在错误回调中,我想知道哪个方法错误发生了,或者两种方法都发生了错误。

看来我应该使用Completable它,但我绝对是 Rx 的新手。

private void method1(final CompletableEmitter e, String path){
Database.getInstance().getReference(path).addListener(new Listener() {
            @Override
            public void onDataChange(Data data) {
               //todo something
               e.onComplete();                
            }
            @Override
            public void onCancelled(DatabaseError databaseError) {
               e.onError(new Throwable(databaseError.getMessage()));
            }
        });
}

方法2是一样的。以下代码无法正常工作。

    Completable completable1 = Completable.create(method1(e););
    Completable completable2 = Completable.create(method2(e););

    completable1
            .doOnError(…)
            .retry(1)
            .andThen(completable2 //never called if completable1 gets onError each time
                    .retry(1)
                    .doOnError(…))
            .subscribe(…).dispose();

标签: androidrx-javarx-android

解决方案


你有很多方法可以做到这一点。我将仅限制解释如何使用两个Completables来实现这一点

假设您有两个可完成项:

Completable doSomething = ...
Completable doSomethingElse = ...

要按顺序执行这些操作,您可以使用andThen运算符连接它们。然后在发生错误时延迟重试,您可以使用retryWhen

doSomething.andThen(doSomethingElse)
    .retryWhen { Flowable.timer(2, TimeUnit.SECONDS) }
    .subscribe()

如果永久发生错误,上面的这段代码将无限重试。要超越,您可以使用以下方法限制尝试次数:

.retryWhen { errors ->
    val retryCounter = AtomicInteger()
    errors.flatMap {
        if (retryCounter.getAndIncrement() <= 3)
            Flowable.timer(2, TimeUnit.SECONDS)
        else Flowable.error(it)
    }
}

如果您只想在发生给定类型的错误时重试,您可以使用:

.retryWhen { errors ->
    val retryCounter = AtomicInteger()
    errors.flatMap {
        if (it is YourSpecficError && retryCounter.getAndIncrement() <= 3)
            Flowable.timer(2, TimeUnit.SECONDS)
        else Flowable.error(it)
    }
}

如果您想独立重试每个,您可以使用:

doSomething.retryWhen { ... }
    .andThen(doSomethingElse.retryWhen { ... })
    .subscribe()

另外,为了避免 retryWhen 逻辑重复,可以将其封装在一个扩展函数中:

fun Completable.retryDelayed(): Completable {
    return this.retryWhen { errors ->
        val retryCounter = AtomicInteger()
        errors.flatMap {
            if (it is YourSpecficError && retryCounter.getAndIncrement() <= 3)
                Flowable.timer(2, TimeUnit.SECONDS)
            else Flowable.error(it)
        }
    }
}

如果你想并行运行你的 Completable,你可以使用merge操作符:

Completable doAll = Completable.merge(listOf(doSomething, doSomething))

推荐阅读