java - Java Kafka Consumer在多线程中的使用
问题描述
我正在考虑在线程池中使用 Kafka Consumer。我提出了这种方法。现在它似乎工作正常,但我正在考虑缺点以及这种方法会带来什么问题。基本上我需要的是将记录处理与消费分离。此外,我需要有一个强有力的保证,即只有在处理完所有记录后才会提交。有人可以就如何更好地做到这一点提出建议或建议吗?
final var consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
final var threadPool = Executors.newFixedThreadPool(32);
while(true) {
ConsumerRecords<String, String> records;
synchronized (consumer) {
records = consumer.poll(Duration.ofMillis(100));
}
CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
synchronized (consumer) {
consumer.commitSync();
}
});
}
解决方案
问题
此解决方案对于所述要求并不稳健:
另外,我需要有一个强有力的保证,即只有在处理完所有记录后才会提交
设想:
- 轮询读取 100 条记录,开始异步处理
- 轮询读取 5 条记录,开始异步处理
- 立即处理 5 条记录,并在处理 100 条记录时完成消费者提交
- 消费者崩溃
当消费者再次启动时,最后一次提交将对应于第 105 条记录。因此它将开始处理第 106 条记录,我们错过了成功处理 1-100 条记录。
您只需通过以下方式提交您在该轮询中处理的偏移量:
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
此外,需要保证顺序,以便首先提交第一个轮询,然后是第二个,依此类推。这将是相当复杂的。
主张
我相信您正在尝试在消息处理中实现并发。这可以通过更简单的解决方案来实现。增加你的max.poll.records以读取一个像样的批次,将其分成更小的批次并异步运行它们以实现并发。完成所有批次后,提交给 kafka 消费者。
推荐阅读
- database - Sqlite 数据库不使用现有索引
- google-cloud-platform - Google Cloud 函数 Nodejs 执行上下文和全局变量
- php - Codeigniter 中的 Mysqli 异常
- python - 如何从包含最大 3 个值的 2d numpy 数组中获取列的索引
- php - Symfony4 内置安全性上的密码重置/更改密码
- typescript - Typescript3.1.3 + 泛型,类型分配错误
- sql-server - SQL 行聚合
- javascript - 切换翻转开关后如何使弹出屏幕出现?
- odoo-11 - Odoo 11 树(高级视图)未打开
- excel - 用户定义函数的自动填充