首页 > 解决方案 > Kafka Connect fetch.max.wait.ms 和 fetch.min.bytes 组合不兑现?

问题描述

我正在使用 Kafka Connect (2.3.0) 创建一个自定义 SinkConnector,需要针对吞吐量而不是延迟进行优化。理想情况下,我想要的是:

大约 20 兆字节或 100k 的批次记录先出现,但如果消息速率较低,则至少每分钟处理一次(避免小批量,但 MySinkTask.put() 最低速率为每分钟)。

这是我为消费者设置设置的,以尝试完成它:

现在,我在低速率情况下观察到的是 MySinkTask.put() 多次调用 0 记录,几分钟过去了,直到达到 fetch.min.bytes,然后我一次得到它们。

到目前为止我无法理解:

我已经仔细检查了日志输出,INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:Connect Runtime 打印的行显示了我通过consumer.前缀值传递的预期值。

标签: apache-kafkaapache-kafka-connect

解决方案


“至少在每个间隔处理”部分似乎是不可能的,因为fetch.min.bytes消费者设置优先,并且 Connect 不允许您在任务运行时动态调整 ConsumerConfig。:-(

目前的解决方法是手动批处理任务;设置fetch.min.bytes为 1(yikes),在 put() 调用时缓冲 Task 中的记录,并在必要时刷新。这不是很理想,因为它为我希望避免的连接器推断出一些开销。

Connect 如何从消费者的轮询到 SinkTask.put() 每秒执行约 2 次批处理的逻辑对我来说仍然是一个谜,但它比每条消息都被调用要好。


推荐阅读