apache-kafka - 水槽:为分区 kafka sink 分配密钥
问题描述
我正在处理一个问题,但在水槽文档中也找不到任何答案来解决它。我想取尾文件的绝对路径并保存。在我想将它作为键传递给 kafka sink 之后,以便在同一分区中拥有具有相同路径的所有事件。我读过很多文章说这是可能的,但我找不到要分配的配置以使其工作。有人可以给我有关如何配置代理的参考或示例吗?
我有以下代理配置:
source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100
Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10
tnk :)
解决方案
最后,我找到了如何配置代理,以便使用数据源中文件的绝对路径为 kafka 主题中的 partitionin 分配密钥。更详细地说,需要设置属性'fileHeaderKey=key'。这样,当事件传递给kafka sink时,header包含对key =absolute/path/of /the/file,kafka可以使用它作为其消息中的关键。
agent3a.sources= source3a
agent3a.channels= channel3a
agent3a.sinkss= sink3a
source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the
key for partitioning###
Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100
Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10
推荐阅读
- javascript - 如何在具有视差效果的滚动时禁用(不隐藏)滚动垂直条
- ruby-on-rails - 如何在活动记录查询中使用正则表达式?
- java - java.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException 启动 Tomcat 服务器时
- python - 如何检查表单是否不包含任何数据
- c# - PInvoke c++ dll 使堆栈不平衡
- c# - 读入 DataTable 时的列类型
- python - 每次迭代后将列表中的值存储到数组中包含python中的指定文件列
- php - 如何在 laravel .env 文件中配置发件人邮件地址
- apache-spark - 如果两个 RDD 依赖于另一个 RDD 火花怎么办
- android - 在 Android 手机主屏幕上检测触摸或滑动