首页 > 解决方案 > 如何在基于 kafka 流的应用程序中降低延迟

问题描述

我有一个带有 3 个 kafka 机器集群的真实环境,它正在接收大量数据。对于每个主题,有 25 个分区,复制因子设置为 2。

我的应用程序(基于 kafka 流的应用程序)从这个 kafka 集群获取数据已关闭一个多月。现在,每个分区都有大量的滞后;达到90000000。

我知道以下参数:

max.poll.records ; default —> 500
max.partition.fetch.bytes ; default —> 1048576
fetch.max.bytes ; default —> 52428800
fetch.min.bytes ; default —> 1

max.poll.interval.ms ; default —> 300000
request.timeout.ms; default —> 30000
session.timeout.ms ; default —> 10000

我有 2 个消费者节点(使用来自 kafka 集群的数据的相同组 ID)。

但是,它并没有赶上滞后,它保持不变。谁能建议如何改进以降低延迟?

标签: apache-kafkaapache-kafka-streams

解决方案


如果您的应用程序关闭了一个月,一些记录已过期,因为主题中的默认保留时间为 7 天,因此您很可能丢失了一些消息。此外,默认偏移重置保留 1 天或 7 天,具体取决于您的 Kafka Streams 版本。似乎你有auto.offset.reset: earliest,所以它从每个分区的开头开始消耗消息。如果您需要跳过所有消息并仅使用新消息,则应设置auto.offset.reset: latest并将application.idvalue 更改为新消息。

如果您想并行消费消息并加快延迟减少,您可以将 config 设置num.stream.threads为某个值,例如12num.stream.threads * numberOfConsumerNodes应该小于或等于numberOfPartitions,否则某些线程将处于空闲状态),或者需要增加消费者节点的数量。


推荐阅读