Search code examples
javaapache-kafkaparquetavroapache-kafka-connect

Parquet files to avro and kafka SourceRecord


I'm trying to convert parquet files into SourceRecords. I'm currently stuck on generating connect schema from avro schema. I'm able to read parquets to GenericRecords:

public static Seq<GenericRecord> genericRecordsOf(Seq<String> parquets) {
    Configuration config = new Configuration();
    config.setBoolean(AVRO_COMPATIBILITY, true);
    config.setBoolean("parquet.avro.add-list-element-records", false);
    config.setBoolean("parquet.avro.write-old-list-structure", false);
    config.setClass("parquet.avro.data.supplier", SchemaTest.class, AvroDataSupplier.class);
    config.set("fs.s3a.impl", S3AFileSystem.class.getCanonicalName());
    return parquets.flatMap(input -> {
          Builder<Record> builder = Try(() -> AvroParquetReader
              .<Record>builder(HadoopInputFile.fromPath(new Path(input), config))
              )
              .get();
          return readRecords(builder);
        }
    );
  }

  private static List<GenericRecord> readRecords(Builder<Record> builder) {
    return Try
        .withResources(builder::build)
        .of(SchemaTest::readRecords)
        .get();
  }

  private static List<GenericRecord> readRecords(ParquetReader<Record> reader) {
    List<GenericRecord> records = new LinkedList<>();
    Record genericRecord = readRecord(reader);
    while (genericRecord != null) {
      records.add(genericRecord);
      genericRecord = readRecord(reader);
    }
    return records;
  }

  private static Record readRecord(ParquetReader<Record> reader) {
    return Try.of(reader::read).get();
  }

The issue is while I'm trying to make connect data from it, using io.confluent.connect.avro.AvroData.toConnectData(avroSchema, avroValue). The exception:


Exception in thread "main" org.apache.kafka.connect.errors.DataException: Mismatched names: name already added to SchemaBuilder (org.apache.kafka.connect.data.Decimal) differs from name in source schema (cedingrate)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1969)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1669)
        at io.confluent.connect.avro.AvroData.toConnectSchemaWithCycles(AvroData.java:2000)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1836)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1669)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1803)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1645)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1326)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1307)
        at com.tryg.data.ingestors.guidewire.factories.SchemaTest.lambda$main$0(SchemaTest.java:103)
        at io.vavr.Value.forEach(Value.java:340)
        at com.tryg.data.ingestors.guidewire.factories.SchemaTest.main(SchemaTest.java:102)

and avro schema generated by AvroParquetReader is (fragment):

"type": "record",
  "name": "spark_schema",
  "fields": [
    {
      "name": "cedingrate",
      "type": [
        "null",
        {
          "type": "fixed",
          ***"name": "cedingrate",
          "size": 16,
          "logicalType": "decimal",
          "precision": 38,
          "scale": 6
        }
      ],
      "default": null
    },
...

I have debugged into the AvroData code and found out that the issue is that AvroData expects "name": "org.apache.kafka.connect.data.Decimal" instead of "name": "cedingrate" in the place marked with ***. Cedingrate is the field name in parquet file. The parquet files are being generated by Guidewire cloud data access framework. I dont know where to look for further. Any tips apreciated. Regards, Dawid.


Solution

  • TLDR; use io.confluent.connect.avro.AvroData class from kafka-connect-avro-data:6.2.6 dependency to convert avro to connect data.

    The above solution will work with io.confluent.connect.avro.AvroData class which is delivered in kafka-connect-avro-data (maven dependency) with version up to 6.2.6. In version 7.0.0, avro fixed schemas, which are being converted to connect schemas by io.confluent.connect.avro.AvroData, are treated as schemas with type FIXED. Up to 7.0.0, they are treated as BYTE type schemas.

    There is some kind of incompatibility between parquet-avro dependency (the way how AvroParquetReader builds fixed schemas from parquet files) and kafka-connect-avro-data (the way how AvroData converts schemas from avro to connect). I wasn't able to find out which of those components work according to avro schema requirements.