apache-kafka - 相同的偏移量和分区记录被消耗两次导致重复
问题描述
我正在尝试使用用 spring-kafka 编写的应用程序来使用记录。我面临着非常独特的情况,无法理解为什么会这样?
我的消费者应用程序以 2 个并发运行,这意味着 2 个消费者线程订阅了具有两个分区的主题。我正在使用记录并使用带有偏移量、分区和插入时间戳的 upsert 将其放入表中。
我在表中看到不应出现的具有相同偏移量和分区的重复值。时间戳值没有差异,意味着插入同时发生。我不确定这怎么可能?我在日志中也没有看到任何问题。我不确定生产者端发生了什么,但无论如何我们不能在相同的偏移量处有 2 个值,所以不确定这是否是生产者端的消费者端的问题。任何有助于我分类的建议或想法这个问题?
卡夫卡日志:
我在日志中也没有看到任何异常。
14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.0
14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
14:29:56.318 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1604154596318
14:29:56.319 [main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Subscribed to topic(s): kaas.pe.enrollment.csp.ts2
14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
14:29:57.923 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:57.924 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:58.121 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:58.125 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
14:30:13.127 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Finished assignment for group at generation 23: {consumer-csp-prov-emerald-test-1-19d92ba5-5dc3-433d-b967-3cf1ce1b4174=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@d7e2a1f, consumer-csp-prov-emerald-test-2-5833c212-7031-4ab1-944b-7e26f7d7a293=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@53c3aad3}
14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500387, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499503, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}
代码 :
@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
prov_tin_number = record.value().get("providerTinNumber").toString();
prov_tin_type = record.value().get("providerTINType").toString();
enroll_type = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind = record.value().get("vcpProvChoiceInd").toString();
error_flag = "";
dataexecutor.peStremUpsertTbl(prov_tin_number, prov_tin_type, enroll_type, vcp_prov_choice_ind, error_flag,
record.partition(), record.offset());
acknowledgement.acknowledge();
}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}
解决方案
推荐阅读
- java - 在接口和抽象类中使用相同的方法
- rest - Rest api vs sqoop
- typescript - 按键访问时返回类型化对象属性
- c# - 在编译时循环遍历类中的所有对象 C#
- grails - 如何在 Grails 中使用自定义身份验证过滤器?
- regex - 如果单词以给定的后缀结尾,则替换所有出现的字母
- php - 插入电源后,PHP SESSION 在 iPad 上死机。铬应用
- java - 在 Ubuntu 中执行 Jmeter 脚本,遇到错误
- javascript - 当我们不知道数据的嵌套级别时,如何在 react/redux 的 reducer 状态属性中更新数据?
- android - firebase 数据库更改时向应用程序用户(android)推送通知?