首页 > 解决方案 > 即使在启用了exact_once之后,kafka流也会获得重复的记录

问题描述

我正在使用 kafka 流来接收一些数据,我注意到它收到的记录比我发送的要多,下面是我在消费者的设置

在消费者

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-user-process");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettigs.getKafkaBroker());
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaSettigs.getTotalStreamTHreadCounnt());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put("isolation.level", "read_committed");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "600");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
       props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

制片方的道具

Propertiesprops=newProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"mybootstarpservers");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"clientnoveluser");

props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,1500))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,(newGenericSerializer<MyPojo>()).getClass().getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyRandom.class);

下面是我的生产者代码

public void producerSendData(String key, MyPojo message) throws Exception {

        final Producer<String, MyPojo s> producer =  myProducerInstance.createProducer();
        final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", key,message);
        try {
            producer.send(record, new ProducerCallback());
            producer.flush();
        }
        finally {

        }

    }

我的主题总共有 10 个分区,我的生产者使用循环类型的分区逻辑并平等地写入所有分区,以便在生产者端进行测试,10 个不同的线程每个写入 1000 条消息。

在消费者方面,有时我收到的消息比发送的消息多,我收到的消息像 10867 一样,而我只发送​​了 10000 条消息。

我注意到我得到了这些重复项,每个流与以下消息重新连接。

2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-29value:{"userId":"message-468","data":null,"data1":null,"data3":null}
**2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.KafkaConsumer:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-restore-consumer,groupId=]Unsubscribedalltopicsorpatternsandassignedpartitions
2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.s.p.i.StreamThread$RebalanceListener:stream-thread[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]partitionrevocationtook16ms.
    suspendedactivetasks:[0_6]
    suspendedstandbytasks:[]
2019-07-14T00:11:06,044INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.i.AbstractCoordinator:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-consumer,groupId=streams-user-process](Re-)joininggroup**
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}

我需要帮助才能了解为什么我会收到更多记录,即使我启用了 exact_once

标签: kafka-consumer-apiapache-kafka-streamskafka-producer-api

解决方案


Exactly once 用于流处理保证对于每个接收到的记录,它的处理结果将被反映一次,即使在失败的情况下也是如此。

Kafka 上下文中的 Exactly_once 是一个适用于“Kafka Streams”的概念,请记住,Kafka Streams 旨在从主题读取并生成主题。

在 Kafka Streams 世界中重新表述:Exactly once 意味着当且仅当状态相应更新并且输出记录成功生成一次时,任何输入记录的处理都被认为已完成。

在您的特定情况下,您的日志key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}似乎是由peek拓扑的方法生成的。

如果您能找到预期的事件数量,您应该检查接收器主题。

因为如果出于某种原因,您的 Kafka Streams 应用程序无法将消息发布到接收器主题,那么传入消息被再次使用和处理以生成输出消息然后保证“恰好一次”合同,这听起来很正常。这就是为什么同一条消息可以在您的日志中多次显示的原因。

您可以在https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/找到更多详细信息


推荐阅读