Search code examples
apache-kafkalogstashavroelastic-stack

Logstash Kafka - Error: negative length -62 given


I am facing the similar issue to this article: https://discuss.elastic.co/t/argumenterror-when-using-kafka-input-avro-codec/116975

Logstash Conf:

input {
    kafka{
        group_id => "group_1"
        topics => ["topic_1"]
        bootstrap_servers => "192.168.0.1:9092"
        codec => avro {
            schema_uri => "/files/GA6/logstash-6.0.0/CONFIG_HOME/myschema.avsc"
        }
    }
}

output{
    stdout{

    }
}

Error Logs:

[2018-01-25T11:54:37,060][FATAL][logstash.runner          ] An unexpected error occurred! 
{:error=>#<ArgumentError: negative length -15 given>, :backtrace=>[
"org/jruby/ext/stringio/StringIO.java:788:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:in `read_bytes'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:in `read_string'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:in `read_data'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
"org/jruby/RubyArray.java:1734:in `each'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:in `read_record'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:in `read_data'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:254:in `block in thread_runner'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in thread_runner'"
]}

Schema Sample:

{
    "type": "record",
    "name": "Sample",
    "doc": "Sample Schema",
    "fields": [{
            "name": "name",
            "type": "string"
        }, {
            "name": "address",
            "type": "string"
        }, {
            "name": "salary",
            "type": "long"
        }
    ]
}

Based on some discussion, I have also added the following:

key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

But still the issue persist...

Let me know if you require any further information.


Solution

  • The issue was actually raised to the Logstash team and was finally integrated into Logstash Kafka input itself rather then codec:

    Usage:

    input {
      kafka {
        id => "kafka_avro_events"
        group_id => "logstash_kafka_avro"
        topics_pattern => "some_avro_topic"
        bootstrap_servers => "kafka:9092"
        decorate_events => true
        value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
        schema_registry_url => "http://schemaregistry:8081"
        metadata_max_age_ms => "5000"
        codec => "json"
      }
    }
    

    Please ensure you are using v > 7.10.x

    Github Issue: https://github.com/logstash-plugins/logstash-input-kafka/pull/239