json - 使用 JSON 字段值作为带有 filebeat 的 Kafka 主题的记录键
问题描述
我在文件中有一个事件(JSON 消息),需要通过 filebeat发送到 Kafka 。JSON 消息如下所示:
{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}
我想将此消息发送给 Kafka。分区键应该是 JSON 事件消息中的应用程序字段。如何在 JSON 消息中提供自定义应用程序字段作为 Kafka 记录的分区键?
filebeat.yml 是这样的:
…
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[message.application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
compression: none
https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf 根据this reference,我们可以使用格式字符串规范来引用事件字段值。
使用此配置,默认消息“默认”始终报告为键。如何配置 filebeat.yml 以提取自定义应用程序字段并将此信息用作 Kafka 分区键?
此外,我尝试在输入部分定义一个字段,如下所示:
type: log
enabled: true
paths:
- /var/logs/*event.log
fields:
log_topic: "event"
application: '%{[application]} string'
fields_under_root: true
和相应的kafka输出为:
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
但随后 kafka 分区键始终为:%{[application]} string
解决方案
假设 JSON 被实际解析,我想你想要
key: '%{[fields.application]:default}'
见例子 - https://www.elastic.co/guide/en/beats/filebeat/6.8/kafka-output.html#topic-option-kafka
您可能还对添加主机元数据感兴趣 - https://www.elastic.co/guide/en/beats/filebeat/6.8/add-host-metadata.html
并解码 JSON - https://www.elastic.co/guide/en/beats/filebeat/6.8/decode-json-fields.html
推荐阅读
- android - 如何将 ffmpeg gl-transition 项目集成到我现有的项目中
- artifactory - JFrog Artifactory 无法启动 artdb.db_properties' 不存在
- c++ - 是否可以为 QScatterSeries 中的每个点设置单独的标记大小?
- css - CSS 媒体查询未按预期工作
- laravel - 在 laravel 中用雄辩的联合和急切加载
- antlr4 - Antlr4 getParent 当前上下文
- java - 在 C/C++ 中使用布尔数组参数调用 JAVA 的布尔数组方法
- python - 如何在python中隐藏导入行?
- reactjs - Reactjs Leaflet 自定义控制按钮
- python - 如何合并两个在不同数据流上训练的 CNN?