首页 > 解决方案 > Kafka Producer 中的消息重试机制

问题描述

我们试图了解 Kafka 上的重试逻辑是如何工作的,我们在 ubuntu 的本地 kafka 集群上运行以下生产者代码

prop.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
prop.setProperty(ProducerConfig.ACKS_CONFIG, "1");
prop.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "16384");
prop.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
prop.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");

final KafkaProducer<String, String> kprod = new KafkaProducer<String, String>(prop);
        
for (int i = 0; i<500; i++) 
{
    Thread.sleep(10);
    kprod.send(new ProducerRecord<>("sample-topic", "key_"   i, "Reference site about Lorem Ipsum, giving information on its origins, as well as a random Lipsum generator Value_"   i), null);
    System.out.println("Record Sent Successfully with Value: "   i);
}
System.out.println("\n\n\n\n\n\n\n\n---------------------------500 message loop complete --------------------------\n\n\n\n\n");
Thread.sleep(60000);

kprod.send(new ProducerRecord<>("sample-topic", "key_"   1000, "Reference site about Lorem Ipsum, giving information on its origins, as well as a random Lipsum generator Value_"   1000));

kprod.flush();
kprod.close();

循环执行时,代理被手动终止。完成后,消费者端收到了233条消息,之后没有收到任何消息。

一旦循环完成执行并且生产者代码进入睡眠状态,我们重新打开代理并期望在消费者端接收到 233 后的消息。但是,我们没有收到任何消息,而是收到以下错误 -

错误 [Consumer clientId=consumer-console-consumer-16102-1, groupId=console-consumer-16102] LeaveGroup 请求与 Generation{generationId=1, memberId='consumer-console-consumer-16102-1-12a7d5ff-518a-4e6c -a8c6-d60c2454b572', protocol='range'} 失败并出现错误:协调器正在加载,因此无法处理请求。(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

使用密钥 1000 在循环外发送的消息也未收到。当生产者代码到达 flush() 方法时,我们希望在消费者端接收到一些消息,但事实并非如此,并且正在尝试理解 Kafka 的这种行为。

任何帮助,将不胜感激。谢谢。

标签: apache-kafkakafka-producer-api

解决方案


推荐阅读