首页 > 解决方案 > Reactor 中的自定义重试

问题描述

我试图根据 Reactor 额外包的功能在 Kotlin 和 Reactor 中实现重试逻辑。我想要做的是传递一个持续时间列表,并且每次context.iteration我都得到列表的第 (iteration-1)th 元素。它部分有效,我总是IndexOutOfBoundsException在最后一次迭代中得到一个,这比我想要的要多,尽管我提供了最大重试次数 - 列表的大小。虽然重试在给定的持续时间和“正确”的次数内运行(肯定是因为IndexOutOfBoundsException阻止了更多),但只有这个异常(这是根本原因)困扰着我。

这是我的自定义 BackOff 界面:

interface MyCustomBackoff : Backoff {
    companion object {
        fun getBackoffDelay(backoffList: List<Duration>): (IterationContext<*>) -> BackoffDelay {
            return { context -> BackoffDelay(backoffList[(context.iteration() - 1).toInt()]) }
        }
    }
}

我的 Kotlin 扩展是:

fun <T> Mono<T>.retryCustomBackoffs(backoffList: List<Duration>, doOnRetry: ((RetryContext<T>) -> Unit)? = null): Mono<T> {
    val retry = Retry.any<T>().retryMax(backoffList.size.toLong()).backoff(MyCustomBackoff.getBackoffDelay(backoffList))

    return if (doOnRetry == null) {
        this.retryWhen(retry)
    }
    else {
        this.retryWhen(retry.doOnRetry(doOnRetry))
    }
}

我在这里想念什么?

标签: kotlinspring-webfluxproject-reactorretry-logic

解决方案


如果你看reactor.retry.AbstractRetry#calculateBackoff,你会发现有一个特殊的BackoffDelay命名RETRY_EXHAUSTED。并且它在retryContext.iteration() > maxIterations(不是>=)之后返回backoff.apply(retryContext)

if (retryContext.iteration() > maxIterations || Instant.now(clock).plus(jitteredBackoff).isAfter(timeoutInstant))
    return RETRY_EXHAUSTED;

因此,如果您在列表中有 2 个自定义退避延迟,则会产生 3 个退避延迟calculateBackoff

你可以改变你的MyCustomBackoff喜欢(对不起Java,我不熟悉Kotlin):

public interface MyCustomBackoff extends Backoff {
    static Backoff getBackoffDelay(List<Duration> backoffList) {
        return context -> context.iteration() <= backoffList.size() ?
                new BackoffDelay(backoffList.get(Long.valueOf(context.iteration() - 1).intValue())) :
                new BackoffDelay(Duration.ZERO);
    }
}

推荐阅读