首页 > 解决方案 > kafka异步产生丢失消息

问题描述

尝试按照互联网上的说明实现kafka异步生产。这是我的制片人的样子:

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
    ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
        producer.send(data, new DefaultProducerCallback());
    }

private static class DefaultProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            logger.error("Asynchronous produce failed");
        }
    }
}

我在这样的 for 循环中生成:

for (int i = 0; i < 5000; i++) {
    int partition = i % 2;
    FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}

但是,某些消息可能会丢失。如下图,从 4508 到 4999 的消息没有送达。

在此处输入图像描述

我发现原因可能是生产者进程的关闭,并且当时未发送的缓存中的所有消息都会丢失。在 for 循环之后添加此行将解决此问题:

producer.flush();

但是,我不确定这是否是一个魅力解决方案,因为我注意到有人提到刷新会使异步发送以某种方式同步,任何人都可以解释或帮助我改进它吗?

标签: apache-kafkakafka-producer-api

解决方案


书中Kafka - The definitive Guide有一个异步生产者的示例,完全按照您编写的代码给出。它sendCallback.

讨论中写道:

在退出之前添加flush()将使客户端等待任何未完成的消息被传递到代理(这将是大约queue.buffering.max.ms,加上延迟)。如果您flush()在每次produce()调用后添加,您将有效地实现同步生产者。

但是,如果您在循环之后for执行此操作,则它不再是同步的,而是异步的。

您还可以做的是acks将 Producer 配置中的 设置为all. 这样,在主题的复制设置为大于 1 的情况下,您将有更多的保证来成功生成消息。


推荐阅读