首页 > 解决方案 > Reactor - 在处理错误的情况下延迟通量元素

问题描述

我有一个与这个问题类似的问题,我没有看到一个可接受的答案。我已经研究过了,没有得到满意的答案。

我有一个轮询量为“x”的反应式 Kafka 消费者(Spring Reactor),应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务的超时执行可能不同,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。当前反应堆中有没有办法自动

  1. 当断路器处于断开状态时做出反应,减少轮询量或减缓消耗。
  2. 当电路关闭时,将轮询量增加到以前的状态(如果它关闭,外部服务将按比例增加)。

我不想使用delayElements,或者delayUntil因为它们本质上是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用程序配置中打开时,我会为消费者提供值。

标签: javareactive-programmingproject-reactorcircuit-breakerreactive-kafka

解决方案


As backpressure is based on the slowness of the consumer, one way to achieve this is to convert certain exception types to delay. You can use the onErrorResume for this purpose as demonstrated below:

long start = System.currentTimeMillis();

Flux.range(1, 1000)
        .doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
        .flatMap(item -> process(item).onErrorResume(this::slowDown), 5) // concurrency limit for demo
        .blockLast();

System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");

private Mono<Integer> process(Integer item) {
    // simulate error for some items
    if (item >= 50 && item <= 100) {
        return Mono.error(new RuntimeException("Downstream failed."));
    }

    // normal processing
    return Mono.delay(Duration.ofMillis(10))
            .thenReturn(item);
}

private Mono<Integer> slowDown(Throwable e) {
    if (e instanceof RuntimeException) { // you could check for circuit breaker exception
        return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
    }

    return Mono.empty(); // no delay for other errors
}

If you check the output of this code, you can see there is some slow down between the items 50 and 100 but it works at regular speed before and after.

Note that my example does not use Kafka. As you are using reactor-kafka library which honors backpressure it is supposed to work the same way as this dummy example.

Also, as the Flux might process items concurrently, the slow down is not immediate, it will try to process some additional items before properly slowing down.


推荐阅读