首页 > 解决方案 > logstash kafka 输入具有不同编解码器的多个主题

问题描述

这是我的logstash conf

input {
  kafka {
    bootstrap_servers => "127.0.0.1:9092"
    topics => ["filebeat", "access"]
    group_id => "test-consumer-group"
    consumer_threads => 1
    decorate_events => true
  }
}

我有两个主题,但我想为不同的主题使用不同的编解码器。我怎样才能做到这一点?

我尝试添加

    if ([topic] == "filebeat") {
      codec => "json"
    }

在 kafka 输入配置中,logstash 返回错误。

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 6, column 8 (byte 143) after input {\n  kafka {\n    bootstrap_servers => \"127.0.0.1:9092\"\n    topics => [\"filebeat\", \"access\"]\n    group_id => \"test-consumer-group\"\n    if "

标签: apache-kafkalogstash

解决方案


您可以使用不同的编解码器创建 2 个单独的 kafka 输入。

另一种选择是添加一个过滤器,根据主题解析 json 对象

filter {
  if([topic] == "filebeat") {
    json {
      source => "message"
    }
  }
}

更多信息检查: https ://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html


推荐阅读