首页 > 解决方案 > 相同的偏移量和分区记录被消耗两次导致重复

问题描述

我正在尝试使用用 spring-kafka 编写的应用程序来使用记录。我面临着非常独特的情况,无法理解为什么会这样?

我的消费者应用程序以 2 个并发运行,这意味着 2 个消费者线程订阅了具有两个分区的主题。我正在使用记录并使用带有偏移量、分区和插入时间戳的 upsert 将其放入表中。

我在表中看到不应出现的具有相同偏移量和分区的重复值。时间戳值没有差异,意味着插入同时发生。我不确定这怎么可能?我在日志中也没有看到任何问题。我不确定生产者端发生了什么,但无论如何我们不能在相同的偏移量处有 2 个值,所以不确定这是否是生产者端的消费者端的问题。任何有助于我分类的建议或想法这个问题?

在此处输入图像描述

卡夫卡日志:

我在日志中也没有看到任何异常。

14:29:56.318 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.0
14:29:56.318 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
14:29:56.318 [main] INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1604154596318
14:29:56.319 [main] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Subscribed to topic(s): kaas.pe.enrollment.csp.ts2
14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
14:29:57.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Cluster ID: 6cbv7QOaSW6j1vXrOCE4jA
14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
14:29:57.915 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Discovered group coordinator apslp1563.uhc.com:9093 (id: 2147483574 rack: null)
14:29:57.923 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:57.924 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:58.121 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] (Re-)joining group
14:29:58.125 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] (Re-)joining group
14:30:13.127 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Finished assignment for group at generation 23: {consumer-csp-prov-emerald-test-1-19d92ba5-5dc3-433d-b967-3cf1ce1b4174=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@d7e2a1f, consumer-csp-prov-emerald-test-2-5833c212-7031-4ab1-944b-7e26f7d7a293=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@53c3aad3}
14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
14:30:13.131 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Successfully joined group with generation 23
14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-1
14:30:13.134 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Adding newly assigned partitions: kaas.pe.enrollment.csp.ts2-0
14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-1, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-0 to the committed offset FetchPosition{offset=500387, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1559.uhc.com:9093 (id: 69 rack: null), epoch=37}}
14:30:13.143 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-csp-prov-emerald-test-2, groupId=csp-prov-emerald-test] Setting offset for partition kaas.pe.enrollment.csp.ts2-1 to the committed offset FetchPosition{offset=499503, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=apslp1562.uhc.com:9093 (id: 72 rack: null), epoch=36}}

代码 :

@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {

        try {
                prov_tin_number         = record.value().get("providerTinNumber").toString();
                prov_tin_type           = record.value().get("providerTINType").toString();
                enroll_type             = record.value().get("enrollmentType").toString();
                vcp_prov_choice_ind     = record.value().get("vcpProvChoiceInd").toString();
                error_flag              = "";

                dataexecutor.peStremUpsertTbl(prov_tin_number, prov_tin_type, enroll_type, vcp_prov_choice_ind, error_flag,
                    record.partition(), record.offset());
                
            
                acknowledgement.acknowledge();
                    
        }catch (Exception ex) {
            System.out.println(record);
            System.out.println(ex.getMessage());
        }

    }

标签: apache-kafkaspring-kafka

解决方案


推荐阅读