首页 > 解决方案 > 如何消费来自 kafka 主题的大消息

问题描述

Kafka Version 1.1.0

我有一个单节点 kafka 代理,在 config/server.properties 中有以下配置:

经纪人配置:

message.max.bytes=100000000
max.message.bytes=100000000
replica.fetch.max.bytes=150000000
log.segment.bytes=1073741824 (Default)

控制台使用者属性文件具有以下配置:

消费者属性:

receive.buffer.bytes=100000000
max.partition.fetch.bytes=100000000
fetch.max.bytes=52428800

我正在生成一条大小约为 20KB 的消息。我使用控制台制作人制作主题。然后在该主题上启动一个控制台使用者,它不会消耗完整的消息(介于两者之间)。

我查看了这篇文章并尝试设置相同的设置,但似乎没有成功。

我在这里想念什么?请帮帮我。

更新:

> echo | xargs --show-limits

Your environment variables take up 3891 bytes
POSIX upper limit on argument length (this system): 2091213
POSIX smallest allowable upper limit on argument length (all systems): 4096
Maximum length of command we could actually use: 2087322
Size of command buffer we are actually using: 131072
Maximum parallelism (--max-procs must be no greater): 2147483647

更新 1:

我已经测试了另一种情况。这次我使用 java 生产者而不是控制台生产者生成相同的消息,现在当我消费时,我得到了完整的消息。

标签: apache-kafka

解决方案


可能是因为您使用控制台生产者并将消息复制到终端(linux)但终端将长消息截断为最大固定长度而出现问题。

您可以尝试使用echo | xargs --show-limits或其他 shell 或术语设置来找出答案。

它也可以来自操作系统,例如 ARG_MAX:

getconf ARG_MAX

对于您的消息来说可能太小了。

最简单的方法是将文件直接写入 kafka-console-producer,例如该示例:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

如果它工作正常,则意味着这确实是问题所在。


作为记录,还应测试这些设置:

  • 消费者端:fetch.message.max.bytes- 这将确定消费者可以获取的最大消息大小。
  • 代理端:replica.fetch.max.bytes- 这将允许代理中的副本在集群内发送消息并确保消息被正确复制。如果这太小,则消息将永远不会被复制,因此,消费者将永远不会看到该消息,因为该消息将永远不会被提交(完全复制)。
  • 代理端:message.max.bytes- 这是代理可以从生产者接收到的最大消息大小。
  • 代理端(每个主题):max.message.bytes- 这是代理允许附加到主题的最大消息大小。此大小在压缩前经过验证。(默认为经纪人的message.max.bytes。)

推荐阅读