首页 > 解决方案 > Logstash 死信队列事件之前提交到死信队列。跳过

问题描述

我正在尝试启用 logstash 死信队列,但是,我收到此警告消息,并且 elasticsearch 没有从 logstash 接收日志。

[WARN ][org.logstash.common.io.DeadLetterQueueWriter][main] Event previously submitted to dead letter queue. Skipping...

我正在尝试的场景,

限制 elasticsearch 分片,因此在这种情况下,它无法创建新分片,并且日志将在死信队列中

curl -X PUT 'http://elasticsearch:9200/_cluster/settings' -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "3" } }'

结束从rabbitmq向logstash发送日志:

{index:"12345",id:1,message:"hello world"}

增加分片限制:

curl -X PUT 'http://elasticsearch:9200/_cluster/settings' -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "10" } }'

当我从logstash api检查死信队列时,我可以看到队列中的日志,但它没有将日志发送到elasticsearch。

logstash.yml:

pipeline:
  batch:
    size: 125
    delay: 5
dead_letter_queue.enable: true

logstash.conf

input {
    udp {
        port => "${INPUT_UDP_PORT}"
        type => syslog
        codec => json
    }
    tcp {
        port => "${INPUT_TCP_PORT}"
        type => syslog
        codec => json_lines
    }
    http {
        port => "${INPUT_HTTP_PORT}"
        codec => "json"
    }
    rabbitmq {
        host => "${RABBIT_MQ_HOST}"
        queue => "log"
        durable => true
        exchange => "log"
        key => "log"
        threads => 3
        prefetch_count => 50
        port => 5672
        user => "${RABBIT_MQ_USERNAME}"
        password => "${RABBIT_MQ_PASSWORD}"
        type => "log"
    }
    dead_letter_queue {
        path => "/usr/share/logstash/data/dead_letter_queue"
        commit_offsets => false
        pipeline_id => "main"
    }
}

filter {
    if [logger_name] =~ "metrics" {
        kv {
            source => "message"
            field_split_pattern => ", "
            prefix => "metric_"
        }
        mutate {
            convert => { "metric_value" => "float" }
            convert => { "metric_count" => "integer" }
            convert => { "metric_min" => "float" }
            convert => { "metric_max" => "float" }
            convert => { "metric_mean" => "float" }
            convert => { "metric_stddev" => "float" }
            convert => { "metric_median" => "float" }
            convert => { "metric_p75" => "float" }
            convert => { "metric_p95" => "float" }
            convert => { "metric_p98" => "float" }
            convert => { "metric_p99" => "float" }
            convert => { "metric_p999" => "float" }
            convert => { "metric_mean_rate" => "float" }
            convert => { "metric_m1" => "float" }
            convert => { "metric_m5" => "float" }
            convert => { "metric_m15" => "float" }
            # No need to keep message field after it has been parsed
            remove_field => ["message"]
        }
    }
    if [type] == "syslog" {
        mutate {
            add_field => { "instance_name" => "%{app_name}-%{host}:%{app_port}" }
        }
    }
    mutate {
        # workaround from https://github.com/elastic/logstash/issues/5115
        add_field => { "[@metadata][LOGSTASH_DEBUG]" => "${LOGSTASH_DEBUG:false}" }
    }
    mutate {
      remove_field => "[geoip][location]"
    }
}

output {
    if [logger_name] =~ "metrics" {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
            index => "metrics-%{+YYYY.MM.dd}"
        }
    } else {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
            index => "logs-%{+YYYY.MM.dd}"
        }
    }
    if [@metadata][LOGSTASH_DEBUG] == "true" {
        stdout {
            codec => rubydebug
        }
    }

}

elasticsearch 和 logstash 版本是:7.9.2

标签: elasticsearchlogstashkibanaelastic-stackelk

解决方案


“事件先前提交到死信队列。跳过...”表示来自 DLQ 输入的事件(特别是包含 DLQ 元数据的事件)已发送到 elasticsearch 输出,并且输出已尝试将其写入 DLQ再次。它会跳过它,因为无休止地循环重试相同的被拒绝事件是没有意义的。

DLQ 文档说“死信队列用于响应代码为 400 或 404 的文档,这两者都表示无法重试的事件。”。您必须根据 DLQ 元数据中记录的原因修改事件,然后再尝试将其发送到 elasticsearch。


推荐阅读