apache-kafka - 如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?
问题描述
使用 Java Kafka Consumerseek()
函数,它需要我们传入TopicPartion
and Offest
。但是,我认为这个 seek 方法会为我的消费者获取订阅的 TopicPartitions 集合。
这是我要处理的示例。
消费者 A 订阅了主题“test-topic”分区 1 和 2。当我调用poll()
. 我处理了一些消息,但我的应用程序出现异常。我不叫commitSync()
。现在我想回到我上次检索到的那些偏移量,poll()
并尝试重新处理它们。那么我该怎么做呢?我是否需要检查每个主题分区的最后提交的偏移量并调用seek()
每个分区?seek()
多次调用是否只接受最后一次调用seek()
?正如我所说,我想确保我的消费者返回所有分区,这样我就不会丢失任何已分配分区上的任何数据。
解决方案
我处理了一些消息,但我的应用程序出现异常。我不调用 commitSync()
如果您不调用commitSync()
,则不会提交消息。如果假设异常杀死了您的程序,那么在重新启动后,消费者通常会从最后提交的偏移量中读取它。
您可能还需要检查auto.offset.reset并将其设置为earliest
.
检查您的消息是否自动提交,因为您commitSync()
不需要自动提交,即enable.auto.commit
可以设置为false
(默认情况下它true
在 Confluent Kafka 中)
如果您的程序没有被异常终止,您始终拥有已使用的记录。您可以重试处理每条记录,然后提交。
ConsumerRecords records = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord record: records)
{
tryProcess(record, 3);
}
consumer.commitSync();
void tryProcess(ConsumerRecord record, int maxRetries) {
if(maxRetries < 1) {
log.warn("max retries exhausted for record");
return;
}
try {
process(record);
} catch(Exception ex){
tryProcess(record, --maxRetries);
}
}
您还可以尝试通过重试来处理批量记录,而不是像tryProcess(records, 3)
记录对应的每条记录那样ConsumerRecords
重试 3 次。我不认为有寻求的必要。
不过,我仍然对 seek() api 的用法感到好奇
seek()
例如,当我们不使用订阅时,可能会使用它,consumer.subscribe()
而是consumer.assign()
当我们只想查看(查看)主题中的消息时通常会这样做,例如控制台消费者。有时,我们可能需要在某个偏移量之后查看一些消息,或者最后n条消息等,而实际上并没有对它们做任何事情,而只是显示。
推荐阅读
- javascript - 使用永久地址让我的树莓派在线直播
- java - 为什么这些循环和散列操作需要 O(N) 时间复杂度?
- typescript - 可以通过接收到的构造函数参数动态地计算类属性吗?
- python - 以不同的方式为图像的各个部分着色
- python - 屏幕截图程序在截图后留下正方形
- apache-kafka - Flink 中的 StreamExecutionEnvironment 和 StreamTableEnvironment 有什么区别
- loops - 我需要使用循环使 4 个正方形垂直移动
- python - 将文件上传到 Django Admin,对其进行处理,然后交付到模板
- arrays - 在反应jsx中一次遍历一个数组
- spring-boot - Kotlin + Mockito + Spring-boot