apache-kafka - kafka异步产生丢失消息
问题描述
尝试按照互联网上的说明实现kafka异步生产。这是我的制片人的样子:
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public void asynSend(String topic, Integer partition, String message) {
ProducerRecord<Object, Object> data = new ProducerRecord<>(topic, partition,null, message);
producer.send(data, new DefaultProducerCallback());
}
private static class DefaultProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Asynchronous produce failed");
}
}
}
我在这样的 for 循环中生成:
for (int i = 0; i < 5000; i++) {
int partition = i % 2;
FsProducerFactory.getInstance().asynSend(topic, partition,i + "th message to partition " + partition);
}
但是,某些消息可能会丢失。如下图,从 4508 到 4999 的消息没有送达。
我发现原因可能是生产者进程的关闭,并且当时未发送的缓存中的所有消息都会丢失。在 for 循环之后添加此行将解决此问题:
producer.flush();
但是,我不确定这是否是一个魅力解决方案,因为我注意到有人提到刷新会使异步发送以某种方式同步,任何人都可以解释或帮助我改进它吗?
解决方案
书中Kafka - The definitive Guide
有一个异步生产者的示例,完全按照您编写的代码给出。它send
与Callback
.
在讨论中写道:
在退出之前添加
flush()
将使客户端等待任何未完成的消息被传递到代理(这将是大约queue.buffering.max.ms
,加上延迟)。如果您flush()
在每次produce()
调用后添加,您将有效地实现同步生产者。
但是,如果您在循环之后for
执行此操作,则它不再是同步的,而是异步的。
您还可以做的是acks
将 Producer 配置中的 设置为all
. 这样,在主题的复制设置为大于 1 的情况下,您将有更多的保证来成功生成消息。
推荐阅读
- lambda - Java 8 函数式编程,收集 List 类型的流的输出
? - python - Django None Type 对象不可调用
- angular - Angular 10 - 有条件地动态加载特定的较少文件
- javascript - 如何使用 Jasmine 在 Angular 中模拟服务依赖项?
- exec - Azure Synapse 将 exec 的结果插入表中
- python - 在哪里可以找到 Python 中内置 map() 函数的源代码
- ajax - chrome 中的网络选项卡不会针对发送的每个请求进行更新
- python - 仅允许使用 -b 重定向或标准输入或标准输出的管道
- javascript - 带有包含值的 chartjs 堆积条形图
- c++ - Linux socket C/C++ - 检查 ip/port 是否已被使用的最佳方法是什么?