Search code examples
apache-flinkflink-streamingflink-sql

Unable to decode avro data from Kafka with avro schema


I'm trying to pass our streaming pipeline to Table API and I almost did it except one field.

I'm reading CSV data from Kafka topic then I'm doing some transformation and sending transformed data to topic out in Avro format. The Avro schema has some complex fields and with one particular field I have a trouble. I'm able to send my data to kafka topic in avro format but unable to read it back using my schema. If I remove that column I can write data and read it back. I can read everything back using Table API.

Avro schema for field:

{
      "name": "problem_field",
      "type": [
        "null", {
          "type": "array",
          "items": {
            "type": "record",
            "name": "ProblemField",
            "doc": "Description of the problem field",
            "fields": [
              {
                "name": "proc",
                "type": "string"
              }, {
                "name": "success",
                "type": "boolean"
              }
            ]
          }
        }
      ],
      "doc": "some doc here",
      "default": null
},

Logical Type for field:

ARRAY<ROW<`proc` STRING NOT NULL, `success` BOOLEAN NOT NULL> NOT NULL>

My table schema:

.column("problem_field", DataTypes.ARRAY(
    ROW(FIELD("proc", STRING().notNull()), FIELD("success", BOOLEAN().notNull()))
)

Error I'm receiving:

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25

My consumer:

    val schema = new Schema.Parser().parse(fs.open(new Path(jobArgs("schema"))))

    val deser = AvroDeserializationSchema.forGeneric(schema)
    val kafka_source = setSourceStream(deser)
    val stream = env.addSource(kafka_source)(deser.getProducedType)

    val config = OutputFileConfig.builder().withPartPrefix("source-part").withPartSuffix(".json").build()
    writeOutputAsText(stream, s"${jobArgs("extracts_path")}", config).setParallelism(1).name("JSON")
    env.execute("Consumer")

I think my problem is Data Types Extraction while I'm writing output format of the field encoded differently than in Avro schema.

If someone could point me in the direction to solve my problem ?


Solution

  • I think I found an answer to my problem. I forgot to NOT NULL for the ROW in my ARRAY.

    Solution :

    .column("problem_field", DataTypes.ARRAY(
        ROW(FIELD("proc", STRING().notNull()), FIELD("success", BOOLEAN().notNull()).notNull())
    )
    

    Now I'm able to read everything