apache-kafka - 如何在基于 kafka 流的应用程序中降低延迟
问题描述
我有一个带有 3 个 kafka 机器集群的真实环境,它正在接收大量数据。对于每个主题,有 25 个分区,复制因子设置为 2。
我的应用程序(基于 kafka 流的应用程序)从这个 kafka 集群获取数据已关闭一个多月。现在,每个分区都有大量的滞后;达到90000000。
我知道以下参数:
max.poll.records ; default —> 500
max.partition.fetch.bytes ; default —> 1048576
fetch.max.bytes ; default —> 52428800
fetch.min.bytes ; default —> 1
max.poll.interval.ms ; default —> 300000
request.timeout.ms; default —> 30000
session.timeout.ms ; default —> 10000
我有 2 个消费者节点(使用来自 kafka 集群的数据的相同组 ID)。
但是,它并没有赶上滞后,它保持不变。谁能建议如何改进以降低延迟?
解决方案
如果您的应用程序关闭了一个月,一些记录已过期,因为主题中的默认保留时间为 7 天,因此您很可能丢失了一些消息。此外,默认偏移重置保留 1 天或 7 天,具体取决于您的 Kafka Streams 版本。似乎你有auto.offset.reset: earliest
,所以它从每个分区的开头开始消耗消息。如果您需要跳过所有消息并仅使用新消息,则应设置auto.offset.reset: latest
并将application.id
value 更改为新消息。
如果您想并行消费消息并加快延迟减少,您可以将 config 设置num.stream.threads
为某个值,例如12
(num.stream.threads * numberOfConsumerNodes
应该小于或等于numberOfPartitions
,否则某些线程将处于空闲状态),或者需要增加消费者节点的数量。
推荐阅读
- javascript - Javascript 函数将我的值转换为 undefined。我和我的老师不明白为什么
- android - Android Studio 布局预览默认主题
- jmeter - 发出 HTTP 请求时 jmeter 显示错误
- jquery - 如果只有一个孩子,我如何将课程应用于父 UL
- python - 替换字符之间的点
- javascript - 这是发送空请求的正确方法吗?
- java - 使用比较器对字符串列表进行排序
- autohotkey - autohotkey 网页抓取代码优化
- c# - 如何在 Xamarin.Forms 中获取以前的数据
- javascript - Javascript:未定义的表