首页 > 解决方案 > 无限通量和批量写入数据库

问题描述

在继续实际事件处理之前,我有一个无限的 Flux(来自使用 reactor-kafka 的 kafka)的事件,我试图将这些事件批量写入数据库。我的问题是让它在适当的背压下工作。

windowTimeout并且bufferTimeout似乎是不错的候选人,因为他们允许我指定最大大小,但也限制了在“流量”低的情况下等待的时间。

首先是windowTimeout,从那里对数据库进行批量写入。然而,这很快就出现了问题:reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...))

然后我切换到bufferTimeout,但没有成功,出现错误reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

我希望以下说明我所追求的流程:

flux.groupBy(envelope -> envelope.partition)
  .flatMap(partitionFlux -> {
    final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
    final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
      .concatMap(batch -> {
        final ConsumedEnvelope last = batch.get(batch.size() - 1);

        return repository.persist(batch) // a)
          .then(last.acknowledge()) // b)
          .thenReturn(batch);
      });

    return processing(batchFlux);
  })
  .subscribe(result -> {
      // ...
  });

a)repository.persist在内部什么都不做,只是迭代批处理以创建插入操作,然后返回 a Mono<Void>

b) ConsumedEnvelope.acknowledge() 用于 Kafka 偏移,我只想在成功持久化批处理后才执行此操作。所有这些都包含在concatMap每个分区一次只处理一个批次中。

如上所述,这会导致溢出异常。是否有任何惯用的方式来实现我试图描述的内容?在我看来,这不应该是一个非常罕见的任务,但我是反应堆的新手,很想得到一些建议。

/d

编辑我意识到简单地添加onBackpressureBuffer实际上可以为我解决这个问题。但总的来说,有没有更好的方法来做到这一点?

编辑 2 ...由于不受约束的需求,上述内容确实引起了问题,我不知何故错过了。所以,回到最初的问题,或者也许是某种方式让 onBackpressureBuffer 不请求未绑定的需求,而只转发下游请求的内容。

标签: javaapache-kafkaproject-reactorflow-control

解决方案


推荐阅读