首页 > 解决方案 > 发送大量消息 Kafka Producer

问题描述

我正在使用卡夫卡。

我有一个包含 10k json 的列表。

目前我发送的Json如下:

for(int i=0 ;i< jsonList.size(); i++){
     ProducerRecord<K,V> record = new ProducerRecord(topic, jsonList[i]);
     producer.send(record);
}

发送每条消息。

我想将列表发送到 kafka 并让 kafka 在 json 之后发送 json(不是一条包含所有 json 字符串的消息),例如:

ProducerRecord<K,V> record = new ProducerRecord(topic, jsonList);
producer.send(record);

我该怎么做?

谢谢

标签: apache-kafka

解决方案


官方通过使用KafkaProducerproducerRecord你不能这样做,但你可以通过配置一些属性来做到这一点ProducerConfig

来自文档生产者的batch.size 将记录批量化为发送到同一分区的请求并立即发送

每当多个记录被发送到同一个分区时,生产者将尝试将记录一起批处理成更少的请求。这有助于客户端和服务器的性能。此配置控制默认批量大小(以字节为单位)。不会尝试批处理大于此大小的记录。

linger.ms这个设置用于生产者的延迟时间,让生产者保持一段时间,以便在此期间所有请求都会被批量发送,但是batch.size是这个的上限,如果生产者获得足够的批量大小它将忽略这个属性并向kafka发送批量消息

生产者将在请求传输之间到达的任何记录组合成一个批处理请求。此设置通过添加少量人为延迟来实现这一点——也就是说,生产者不会立即发送一条记录,而是等待给定的延迟,以允许发送其他记录,以便可以将发送一起批处理。这个设置给出了批处理延迟的上限:一旦我们得到一个分区的 batch.size 值的记录,不管这个设置如何,它都会立即发送。


推荐阅读