首页 > 解决方案 > 如何从日志中的字段设置 Logstash 管道中 Elasticsearch 输出的操作?

问题描述

我的 Elasticsearch ETL 堆栈使用 Filebeat 从文件中获取 JSON 对象,并通过 Logstash 将它们传递给 Elasticsearch。我想在我的 JSON 对象中传递一个操作属性,该属性指定“更新”或“删除”作为我想要对索引中的文档执行的操作。我当前的配置具有硬连线到“更新”的操作,并且只会执行更新插入美好的。

JSON 示例

{ "key":123,"index_name":"companies","action":"update"}
{ "key":123,"index_name":"companies","action":"delete"}

当前管道.conf

input { 
    beats {
        id => "filebeat-input"
        port => 5044
        codec => "json"
        include_codec_tag => false
    }
} 
output { 
    elasticsearch { 
      id => "elasticsearch-output"
      hosts => ["localhost:9200"]
      document_id => "%{key}"
      index => "%{[@metadata][index_name]}"
      action => "update"
      doc_as_upsert => true
      manage_template => false
    }
}

我试图这样做:

output { 
    elasticsearch { 
      id => "elasticsearch-output"
      hosts => ["localhost:9200"]
      document_id => "%{key}"
      index => "%{[@metadata][index_name]}"
      action => "%{action}"
      doc_as_upsert => true
      manage_template => false
    }
}

但是,如果文档不存在,则它不会按预期进行更新,并且会记录一个document_missing_exception错误:

[2019-08-21T15:21:28,879][WARN][logstash.outputs.elasticsearch]
无法将事件索引到 Elasticsearch。
{:status=>404, :action=>["update", {:_id=>"123", :_index=>"companies", :_type=>"_doc", :routing=>nil, :retry_on_conflict= >1}, #],
:response=>{"update"=>{"_index"=>"companies", "_type"=>"_doc", "_id"=>"123", "status"=> 404,
"错误"=>{"type"=>"document_missing_exception", "reason"=>"[_doc][123]: 文档丢失", "index_uuid"=>"uU9oXFtZSXGodoh70YG3Ng", "shard"=>"0 ", "索引"=>"公司"}}}}

标签: elasticsearchlogstashlogstash-configuration

解决方案


推荐阅读