kafka-consumer-api - 如何在处理完大量消息之前暂停消费
问题描述
我正在寻找一种方法来暂停 Kafka 分区的消费,直到处理完大量消息。
例如
kafkaReceiver.receive() // I would like to pause this consumption until the current flux is processed
.flatmapIterable(records -> process(records))
.concatMap(record -> commit(record))
目前我正在考虑添加delay
或添加onBackpressureBuffer
以减慢消耗。
任何建议或指示都会非常有帮助。
解决方案
推荐阅读
- oracle - 获取数据库名称,卸载数据库时的创建日期 Oracle 11g
- actions-on-google - 是否可以使用字符串更改用户存储中的字段名称?
- javascript - 如何添加确认对话框?如果按是,按钮将下载,如果按否,按钮将取消
- react-native - 无法读取 React-Native 和 React-Navigation 3.x 上未定义的属性“ScrollView”
- excel - 将值从一个 Excel 工作簿粘贴到其他工作簿
- python - 如果输入不是整数,则请求用户重新输入
- javascript - 如何更好地将学生科目和成绩插入数据库?
- python - 如何使用熊猫在 dataFrame 中创建句点?
- modx - 如何在 MODX 集合子网格中配置下拉列表?
- types - 在 Julia 中将 DataType 转换为 UnionAll