elasticsearch - 将 Logstash 输入发送到多个输出
问题描述
我有以下 Logstash 配置:
input {
kafka {
bootstrap_servers => "svc-kafka:9093"
topics => ["ELK.LOG_EVENT.PROC", "ELK.API_ANALYTICS.PROC"]
codec => "json"
decorate_events => true
}
}
output {
if [kafka][topic] == "ELK.LOG_EVENT.PROC" {
elasticsearch {
hosts => ["svc-es:9200"]
index => "elklogevent-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
} else {
elasticsearch {
hosts => ["svc-es:9200"]
index => "elkapianalytics-%{+YYYY.MM.dd}"
document_id => "%{id}"
}
}
}
但我收到以下错误:
[2018-10-11T13:16:30,035][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 24, column 1 (byte 514) after ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:171:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:335:in `block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:332:in `block in converge_state'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:319:in `converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:343:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}
尽管它明确表示对第 24 行不满意,但我不知道诊断问题的最佳方法是什么?一般来说,我对 Ruby 语言和/或 Logstash 不太熟悉。
此片段基于此 SO 帖子:logstash 5.0.1:为多个 kafka 输入主题设置弹性搜索多个索引输出
stdout { codec => rubydebug }
--- 编辑:包含内部output {}
块后的示例调试输出---
{
"message" => "Started Application in 15.296 seconds (JVM running for 16.37)",
"@version" => "1",
"loggerFqcn" => "org.apache.commons.logging.LogFactory$Log4jLog",
"threadPriority" => 5,
"timestamp" => "2018-10-11T14:39:35.984+0000",
"level" => "INFO",
"threadId" => 1,
"hostname" => "deploy-obfuscated-service-59ffb8957d-rbgs6",
"endOfBatch" => false,
"loggerName" => "com.abc.obfuscated.Application",
"service" => "obfuscated-service",
"thread" => "main",
"timeMillis" => 1539268775984,
"@timestamp" => 2018-10-11T14:39:35.987Z
}
解决方案
使用decorate_events => true
时[kafka][topic]
字段实际上是添加到@metadata
字段中的,因此您只需将配置更改为:
if [@metadata][kafka][topic] == "ELK.LOG_EVENT.PROC" {
推荐阅读
- angular - 在嵌套表单元素上使用角度材料表单字段错误消息
- amazon-web-services - 尝试调用 Go AWS Lambda 函数时权限被拒绝
- generics - 规范文件 (.ads) 中的不可见声明
- string - 检查字符串是否是另一个字符串的子序列但不是子词的算法
- java - 为 Future 回调编写 junit 测试用例
- python - 用于 OCR 的 Python OpenCV 偏斜校正
- mysql - 如何从每个月获取 MIN 和 MAX 日期
- reactjs - 如何在 Gatsby 中映射数组?
- javascript - 用户使用firebase和javascript注册时如何发送电子邮件验证
- istio - Istio 虚拟服务超时是否可以限制在特定的请求时间