首页 > 解决方案 > 同步和异步如何同时工作

问题描述

我第一次使用 Kafka 来处理实时消息。

使用Sync偏移量,消息将在comittedfailed to commit之前从代理接收下一条消息。并且在ASync偏移的情况下,无论最后一条消息是否已提交,都将收到下一条commited消息still pending

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %s, offset = %d,
                customer = %s, country = %s\n",
                record.topic(), record.partition(),
                record.offset(), record.key(), record.value());
        }
        consumer.commitAsync(); 1
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync(); 2
    } finally {
        consumer.close();
    }
}

但是让我们说如果我Sync and ASync像上面的例子一样同时使用两者,它会如何工作?

例如,在民意调查中,如果我收到 5 条偏移量为 的消息1 to 5,并且其中的偏移量也1 and 2得到处理和提交。但是从3 to 5偏移量得到处理但未提交,对于发送的相同请求ASync但卡在网络中的某个地方。

所有提交请求都将已经从 ASync 发送到代理,但1 and 2仅提交,然后部分控制权交给 Sync final。它将尝试Not-Committed一一提交所有消息,直到失败或成功。但是,如果它仅在第三个偏移处失败,那么在重新平衡中,它将从哪里开始读取?

标签: javaapache-kafka

解决方案


finally 块只有在 while 循环中出现异常时才会到达,因此一次只发生一种提交类型

另请注意,您正在提交完整批次,而不是在 for 循环中一一提交,默认 max.poll.records=500,那么这是您在任何时候提交的最大偏移量

最新和最早将如何发挥作用
这仅适用于不存在先前组的情况。

如果一个组存在并重新平衡,它总是从现有的偏移量恢复,不使用这个属性


推荐阅读