elasticsearch - 如何从日志中的字段设置 Logstash 管道中 Elasticsearch 输出的操作?
问题描述
我的 Elasticsearch ETL 堆栈使用 Filebeat 从文件中获取 JSON 对象,并通过 Logstash 将它们传递给 Elasticsearch。我想在我的 JSON 对象中传递一个操作属性,该属性指定“更新”或“删除”作为我想要对索引中的文档执行的操作。我当前的配置具有硬连线到“更新”的操作,并且只会执行更新插入美好的。
JSON 示例
{ "key":123,"index_name":"companies","action":"update"}
{ "key":123,"index_name":"companies","action":"delete"}
当前管道.conf
input {
beats {
id => "filebeat-input"
port => 5044
codec => "json"
include_codec_tag => false
}
}
output {
elasticsearch {
id => "elasticsearch-output"
hosts => ["localhost:9200"]
document_id => "%{key}"
index => "%{[@metadata][index_name]}"
action => "update"
doc_as_upsert => true
manage_template => false
}
}
我试图这样做:
output {
elasticsearch {
id => "elasticsearch-output"
hosts => ["localhost:9200"]
document_id => "%{key}"
index => "%{[@metadata][index_name]}"
action => "%{action}"
doc_as_upsert => true
manage_template => false
}
}
但是,如果文档不存在,则它不会按预期进行更新,并且会记录一个document_missing_exception错误:
[2019-08-21T15:21:28,879][WARN][logstash.outputs.elasticsearch]
无法将事件索引到 Elasticsearch。
{:status=>404, :action=>["update", {:_id=>"123", :_index=>"companies", :_type=>"_doc", :routing=>nil, :retry_on_conflict= >1}, #],
:response=>{"update"=>{"_index"=>"companies", "_type"=>"_doc", "_id"=>"123", "status"=> 404,
"错误"=>{"type"=>"document_missing_exception", "reason"=>"[_doc][123]: 文档丢失", "index_uuid"=>"uU9oXFtZSXGodoh70YG3Ng", "shard"=>"0 ", "索引"=>"公司"}}}}
解决方案
推荐阅读
- cassandra - 我们如何在 Cassandra 中进行空间查询?Cassandra 是否有任何 GIS 扩展?
- css - SCSS - 检查字体系列的变量
- apache-spark - 如何强制 Spark/Hive 创建具有自定义权限的 task_* 目录
- python - NLP:是否有任何模型可以生成具有自定义长度的句子嵌入?
- xamarin.forms - 如何在 HttpClient Post 中发送 XML 数据
- performance - 有人可以解释为什么 SDL 窗口在几秒钟后变灰吗?
- javascript - 广告内的服务器端图库(php、js?、ajax)
- jquery - JQuery 捕获两个不同元素上的两个不同事件以运行相同的功能
- sql - 使用更新语句删除行
- docker - 从 Gitlab CI/CD 管道执行 docker 命令时权限被拒绝