首页 > 解决方案 > 使用 JSON 字段值作为带有 filebeat 的 Kafka 主题的记录键

问题描述

我在文件中有一个事件(JSON 消息),需要通过 filebeat发送到 Kafka 。JSON 消息如下所示:

{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}

我想将此消息发送给 Kafka。分区键应该是 JSON 事件消息中的应用程序字段。如何在 JSON 消息中提供自定义应用程序字段作为 Kafka 记录的分区键?

filebeat.yml 是这样的:

…
        output.kafka:
          version: 0.10.1
          hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
          topic: '%{[log_topic]}'
          codec.format:
            string: '%{[message]}'
          key: '%{[message.application]:default}'
          partition.hash:
            hash: []
            random: true # if false non-hashable events will be dropped
          required_acks: 1
          compression: none

https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf 根据this reference,我们可以使用格式字符串规范来引用事件字段值。

使用此配置,默认消息“默认”始终报告为键。如何配置 filebeat.yml 以提取自定义应用程序字段并将此信息用作 Kafka 分区键?

此外,我尝试在输入部分定义一个字段,如下所示:

type: log
  enabled: true
  paths:
  - /var/logs/*event.log
  fields:
    log_topic: "event"
    application: '%{[application]} string'
  fields_under_root: true

和相应的kafka输出为:

output.kafka:
  version: 0.10.1
  hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
  topic: '%{[log_topic]}'
  codec.format:
    string: '%{[message]}'
  key: '%{[application]:default}'

  partition.hash:
    hash: []
    random: true # if false non-hashable events will be dropped
  required_acks: 1

但随后 kafka 分区键始终为:%{[application]} string

标签: jsonapache-kafkaelastic-stackfilebeatelastic-beats

解决方案


假设 JSON 被实际解析,我想你想要

key: '%{[fields.application]:default}'

见例子 - https://www.elastic.co/guide/en/beats/filebeat/6.8/kafka-output.html#topic-option-kafka

您可能还对添加主机元数据感兴趣 - https://www.elastic.co/guide/en/beats/filebeat/6.8/add-host-metadata.html

并解码 JSON - https://www.elastic.co/guide/en/beats/filebeat/6.8/decode-json-fields.html


推荐阅读