java - 同步和异步如何同时工作
问题描述
我第一次使用 Kafka 来处理实时消息。
使用Sync
偏移量,消息将在comitted
或failed to commit
之前从代理接收下一条消息。并且在ASync
偏移的情况下,无论最后一条消息是否已提交,都将收到下一条commited
消息still pending
。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync(); 1
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(); 2
} finally {
consumer.close();
}
}
但是让我们说如果我Sync and ASync
像上面的例子一样同时使用两者,它会如何工作?
例如,在民意调查中,如果我收到 5 条偏移量为 的消息1 to 5
,并且其中的偏移量也1 and 2
得到处理和提交。但是从3 to 5
偏移量得到处理但未提交,对于发送的相同请求ASync
但卡在网络中的某个地方。
所有提交请求都将已经从 ASync 发送到代理,但1 and 2
仅提交,然后部分控制权交给 Sync final
。它将尝试Not-Committed
一一提交所有消息,直到失败或成功。但是,如果它仅在第三个偏移处失败,那么在重新平衡中,它将从哪里开始读取?
最新和最早将如何发挥作用?
如果属性设置为
latest
,那么消费者会在重新平衡后考虑哪条消息在代理中是最新的?因为3, 4, 5
已经存在已处理但未提交的消息,再加上一些新消息,可以说是6, 7, 8
偏移量。哪一个会是最新的?3号起还是6号起?如果属性设置为
earliest
,那么消费者会在重新平衡后考虑哪个消息最早在代理中?从哪个偏移消费者开始阅读消息?
解决方案
finally 块只有在 while 循环中出现异常时才会到达,因此一次只发生一种提交类型
另请注意,您正在提交完整批次,而不是在 for 循环中一一提交,默认 max.poll.records=500,那么这是您在任何时候提交的最大偏移量
最新和最早将如何发挥作用
这仅适用于不存在先前组的情况。
如果一个组存在并重新平衡,它总是从现有的偏移量恢复,不使用这个属性
推荐阅读
- javascript - 创建一个在变量未定义时解析的 Promise
- r - 编织降价时如何解决此错误
- python - 如何使用python中另一列的值创建一个新列
- android - 从 BroadcastReceiver 类 Kotlin 调用活动方法
- jenkins - Jenkins 插件管理最佳实践
- linux - 如何在 install4j 的控制台模式下随时关闭安装程序?
- javascript - 带有 0 值十进制的 JavaScript 整数
- camera - Openmv m7 摄像头模块是否适用于虹膜识别系统?
- python - 如何在 discord.py 中使用 sqlite3 配置欢迎事件?
- python - 无法将 csv 文件中的 cookie 导入 Python 中的 Requests Cookie Jar