kafka-consumer-api - 即使在启用了exact_once之后,kafka流也会获得重复的记录
问题描述
我正在使用 kafka 流来接收一些数据,我注意到它收到的记录比我发送的要多,下面是我在消费者的设置
在消费者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-user-process");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSettigs.getKafkaBroker());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaSettigs.getTotalStreamTHreadCounnt());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put("isolation.level", "read_committed");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "600");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
制片方的道具
Propertiesprops=newProperties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"mybootstarpservers");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"clientnoveluser");
props.put(ProducerConfig.ACKS_CONFIG,"all");
props.put(ProducerConfig.RETRIES_CONFIG,3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG,1500))
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,(newGenericSerializer<MyPojo>()).getClass().getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyRandom.class);
下面是我的生产者代码
public void producerSendData(String key, MyPojo message) throws Exception {
final Producer<String, MyPojo s> producer = myProducerInstance.createProducer();
final ProducerRecord<String, MyPojo> record = new ProducerRecord<String, MyPojo>("usertopic", key,message);
try {
producer.send(record, new ProducerCallback());
producer.flush();
}
finally {
}
}
我的主题总共有 10 个分区,我的生产者使用循环类型的分区逻辑并平等地写入所有分区,以便在生产者端进行测试,10 个不同的线程每个写入 1000 条消息。
在消费者方面,有时我收到的消息比发送的消息多,我收到的消息像 10867 一样,而我只发送了 10000 条消息。
我注意到我得到了这些重复项,每个流与以下消息重新连接。
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-29value:{"userId":"message-468","data":null,"data1":null,"data3":null}
**2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.KafkaConsumer:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-restore-consumer,groupId=]Unsubscribedalltopicsorpatternsandassignedpartitions
2019-07-14T00:11:06,043INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.s.p.i.StreamThread$RebalanceListener:stream-thread[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]partitionrevocationtook16ms.
suspendedactivetasks:[0_6]
suspendedstandbytasks:[]
2019-07-14T00:11:06,044INFO[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8]o.a.k.c.c.i.AbstractCoordinator:[ConsumerclientId=streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-8-consumer,groupId=streams-user-process](Re-)joininggroup**
2019-07-14T00:11:06,043DEBUG[streams-user-process-ed85a88b-73c5-4416-9564-c15343fd53bd-StreamThread-6]c.j.m.s.UserKafkaStreamTopology:DataatStream
key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}
我需要帮助才能了解为什么我会收到更多记录,即使我启用了 exact_once
解决方案
Exactly once 用于流处理保证对于每个接收到的记录,它的处理结果将被反映一次,即使在失败的情况下也是如此。
Kafka 上下文中的 Exactly_once 是一个适用于“Kafka Streams”的概念,请记住,Kafka Streams 旨在从主题读取并生成主题。
在 Kafka Streams 世界中重新表述:Exactly once 意味着当且仅当状态相应更新并且输出记录成功生成一次时,任何输入记录的处理都被认为已完成。
在您的特定情况下,您的日志key:key-30value:{"userId":"message-569","data":null,"data1":null,"data3":null}
似乎是由peek
拓扑的方法生成的。
如果您能找到预期的事件数量,您应该检查接收器主题。
因为如果出于某种原因,您的 Kafka Streams 应用程序无法将消息发布到接收器主题,那么传入消息被再次使用和处理以生成输出消息然后保证“恰好一次”合同,这听起来很正常。这就是为什么同一条消息可以在您的日志中多次显示的原因。
您可以在https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/找到更多详细信息
推荐阅读
- java - Morphia 2.0.0-RC1 查询在我保存对象后返回空列表
- python - 气流调度程序错误 - MySQL OperationalError 2006,“无法连接到 MySQL 服务器
(111)" - php - preg_match_all 多条代码扫描匹配第一个单词匹配
- python - POST 请求不适用于令牌验证检查
- python - 在 Django (Django 3.0) 中将现有 id 转换为 uuid 后修复错误“格式错误的十六进制 UUID 字符串”
- python - 使用 Selenium 和 Chrome 开发工具的 Chrome 内存泄漏
- string - 在 Lua 中捕获不被平衡括号括起来的字符串
- .htaccess - 如何使用 htaccess 在 url 中添加斜杠?
- php - 根据在 WooCommerce 中选择的城市显示或隐藏结帐邮政编码字段
- c - 是否定义了递增 _Bool?