apache-kafka - Flume KafkaChannel 跨分区平衡消息
问题描述
我设置了一个水槽代理,它使用 SpoolDir 命令从目录中读取 CSV 文件。
我正在使用通道类型 KafkaChannel 来将这些消息推送到具有 10 个分区的 Kafka 主题上,稍后可以由 Spark 应用程序处理。
我遇到的问题是每个文件都写入 Kafka 主题的单个分区。一些文件比其他文件大得多,这意味着消息在主题分区中的分布非常不均匀。这使得为我的 spark executors 分配正确数量的资源变得异常困难,因为有些人最终完成了所有繁重的工作,而另一些人则坐在那里等待将一些日志添加到他们的分区中。
有没有办法在flume中配置KafkaChannel来平衡主题分区之间的消息,或者限制一次发送的消息数量,从而将负载分散到所有可用分区?
我在水槽中使用了以下配置选项但没有成功:
a1.channels.kafkaChannel.capacity = 100
a1.channels.kafkaChannel.transactionCapacity = 100
a1.channels.kafkaChannel.batch.size = 100
KafkaChannel 源代码已经稍作修改以满足我的需要,但这里指定的默认配置选项仍然可用: http: //flume.apache.org/FlumeUserGuide.html#kafka-channel
完整的配置文件(我已经删除了主机名和其他关键信息)
a1.sources = src
a1.channels = kafkaChannel
a1.sources.src.type = spooldir
a1.sources.src.channels = kafkaChannel
a1.sources.src.spoolDir = /data/silk/flume/V5
a1.sources.src.fileHeader = true
a1.sources.src.trackerDir = .flumespool
a1.sources.src.consumeOrder = oldest
a1.sources.src.deletePolicy = immediate
a1.sources.src.decodeErrorPolicy = REPLACE
a1.sources.src.pollDelay = 12000
a1.channels.kafkaChannel.type = com.example.flume.channel.kafka.KafkaChannel
a1.channels.kafkaChannel.brokerList = host1:9092,host2:9092,host3:9092
a1.channels.kafkaChannel.topic = TEST-TOPIC
a1.channels.kafkaChannel.capacity = 100
a1.channels.c1.transactionCapacity = 100
a1.channels.kafkaChannel.zookeeperConnect = host1:2181,host2:2181,host3:2181
a1.channels.kafkaChannel.parseAsFlumeEvent = false
任何帮助表示赞赏,在此先感谢!
解决方案
对于其他面临此问题的人,我找到了一种解决方法:
通过实现 MemoryChannel 和 KafkaSink,而不是将日志直接推送到 KafkaChannel,我可以看到消息在我的 Kafka 主题分区中更加均匀地平衡。
推荐阅读
- javascript - 当该字段是外键时,如何使用 OR 条件进行查询
- postgresql - postgres 无法创建复制槽
- python - 如何使用python打开文件,将行拆分为列表,然后在第三个列表中搜索具有最高值的行
- c - 使用枚举修改 C 中的打印语句
- ruby-on-rails - 如何在移动性中使用语言环境或失败访问器验证翻译?
- python - Python中的最佳拟合线不准确
- javascript - jQuery preventDefault() 不能与 .on() 一起使用
- google-apps-script - 扩展 Google 表格脚本
- javascript - 如何更新 WeakMap 中现有键的值?
- sequelize.js - Sequelize - 根据另一个字段的值返回新字段