首页 > 解决方案 > Logstash Kafka - 错误:给出负长度 -62

问题描述

我面临与本文类似的问题:https ://discuss.elastic.co/t/argumenterror-when-using-kafka-input-avro-codec/116975

Logstash 配置:

input {
    kafka{
        group_id => "group_1"
        topics => ["topic_1"]
        bootstrap_servers => "192.168.0.1:9092"
        codec => avro {
            schema_uri => "/files/GA6/logstash-6.0.0/CONFIG_HOME/myschema.avsc"
        }
    }
}

output{
    stdout{

    }
}

错误日志:

[2018-01-25T11:54:37,060][FATAL][logstash.runner          ] An unexpected error occurred! 
{:error=>#<ArgumentError: negative length -15 given>, :backtrace=>[
"org/jruby/ext/stringio/StringIO.java:788:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:in `read_bytes'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:in `read_string'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:in `read_data'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
"org/jruby/RubyArray.java:1734:in `each'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:in `read_record'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:in `read_data'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:254:in `block in thread_runner'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in thread_runner'"
]}

架构示例:

{
    "type": "record",
    "name": "Sample",
    "doc": "Sample Schema",
    "fields": [{
            "name": "name",
            "type": "string"
        }, {
            "name": "address",
            "type": "string"
        }, {
            "name": "salary",
            "type": "long"
        }
    ]
}

基于一些讨论,我还添加了以下内容:

key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

但是问题仍然存在...

如果您需要任何进一步的信息,请告诉我。

标签: apache-kafkalogstashavroelastic-stack

解决方案


这个问题实际上是向 Logstash 团队提出的,最终被集成到 Logstash Kafka 输入本身而不是编解码器中:

用法:

input {
  kafka {
    id => "kafka_avro_events"
    group_id => "logstash_kafka_avro"
    topics_pattern => "some_avro_topic"
    bootstrap_servers => "kafka:9092"
    decorate_events => true
    value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
    schema_registry_url => "http://schemaregistry:8081"
    metadata_max_age_ms => "5000"
    codec => "json"
  }
}

请确保您使用的是 v > 7.10.x

Github 问题:https ://github.com/logstash-plugins/logstash-input-kafka/pull/239


推荐阅读