首页 > 解决方案 > 如何在处理完大量消息之前暂停消费

问题描述

我正在寻找一种方法来暂停 Kafka 分区的消费,直到处理完大量消息。

例如

kafkaReceiver.receive() // I would like to pause this consumption until the current flux is processed
.flatmapIterable(records -> process(records))
.concatMap(record -> commit(record))

目前我正在考虑添加delay或添加onBackpressureBuffer以减慢消耗。

任何建议或指示都会非常有帮助。

标签: kafka-consumer-apiproject-reactorreactor

解决方案


推荐阅读