apache-kafka - Kafka Connect fetch.max.wait.ms 和 fetch.min.bytes 组合不兑现?
问题描述
我正在使用 Kafka Connect (2.3.0) 创建一个自定义 SinkConnector,需要针对吞吐量而不是延迟进行优化。理想情况下,我想要的是:
大约 20 兆字节或 100k 的批次记录先出现,但如果消息速率较低,则至少每分钟处理一次(避免小批量,但 MySinkTask.put() 最低速率为每分钟)。
这是我为消费者设置设置的,以尝试完成它:
- 消费者.max.poll.records=100000
- 消费者.fetch.max.bytes=20971520
- 消费者.fetch.max.wait.ms=60000
- 消费者.max.poll.interval.ms=120000
消费者.fetch.min.bytes=1048576
我需要这个 fetch.min.bytes 设置,否则 MySinkTask.put() 每秒调用多次,尽管有其他设置......?
现在,我在低速率情况下观察到的是 MySinkTask.put() 多次调用 0 记录,几分钟过去了,直到达到 fetch.min.bytes,然后我一次得到它们。
到目前为止我无法理解:
- 为什么 fetch.max.wait.ms=60000 没有从消费者向下推到我的连接器的 put() 调用?那不应该优先于 fetch.min.bytes 吗?
- 如果 fetch.min.bytes=1 (默认),什么设置控制对 MySinkTask.put() 的每秒约 2 次调用?我不明白为什么会这样,即使 Connect 运行时设置的详细输出也没有显示任何低于秒数的间隔。
我已经仔细检查了日志输出,INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
Connect Runtime 打印的行显示了我通过consumer.
前缀值传递的预期值。
解决方案
“至少在每个间隔处理”部分似乎是不可能的,因为fetch.min.bytes
消费者设置优先,并且 Connect 不允许您在任务运行时动态调整 ConsumerConfig。:-(
目前的解决方法是手动批处理任务;设置fetch.min.bytes
为 1(yikes),在 put() 调用时缓冲 Task 中的记录,并在必要时刷新。这不是很理想,因为它为我希望避免的连接器推断出一些开销。
Connect 如何从消费者的轮询到 SinkTask.put() 每秒执行约 2 次批处理的逻辑对我来说仍然是一个谜,但它比每条消息都被调用要好。
推荐阅读
- javascript - 通过比较两个数组获得总和
- perl - 解析命令行参数,但不验证
- sql - 如何计算 postgreSQL 中两天的时间差并以天数返回?
- javascript - 如何将 ExtJS“复选框”(按钮)与 Javascript/JQuery 同步?
- flutter - Flutter:Flutter Video Player 无法播放视频文件 TWICE [视频控制器处理后无法使用]~
- php - first() 的替代方法是在 laravel 中导入整个字符串
- openshift - 无法连接存在于 Openshift 集群之外的外部 redis 集群
- php - 如何使用 Alphatab 的播放速度
- paypal - Paypal 给出了无效的刷新令牌错误,并且未授予同意 Oauth2
- makefile - 将变量从 sub-make 传递到 main make