I'm trying to convert parquet files into SourceRecords. I'm currently stuck on generating connect schema from avro schema. I'm able to read parquets to GenericRecords:
public static Seq<GenericRecord> genericRecordsOf(Seq<String> parquets) {
Configuration config = new Configuration();
config.setBoolean(AVRO_COMPATIBILITY, true);
config.setBoolean("parquet.avro.add-list-element-records", false);
config.setBoolean("parquet.avro.write-old-list-structure", false);
config.setClass("parquet.avro.data.supplier", SchemaTest.class, AvroDataSupplier.class);
config.set("fs.s3a.impl", S3AFileSystem.class.getCanonicalName());
return parquets.flatMap(input -> {
Builder<Record> builder = Try(() -> AvroParquetReader
.<Record>builder(HadoopInputFile.fromPath(new Path(input), config))
)
.get();
return readRecords(builder);
}
);
}
private static List<GenericRecord> readRecords(Builder<Record> builder) {
return Try
.withResources(builder::build)
.of(SchemaTest::readRecords)
.get();
}
private static List<GenericRecord> readRecords(ParquetReader<Record> reader) {
List<GenericRecord> records = new LinkedList<>();
Record genericRecord = readRecord(reader);
while (genericRecord != null) {
records.add(genericRecord);
genericRecord = readRecord(reader);
}
return records;
}
private static Record readRecord(ParquetReader<Record> reader) {
return Try.of(reader::read).get();
}
The issue is while I'm trying to make connect data from it, using io.confluent.connect.avro.AvroData.toConnectData(avroSchema, avroValue)
.
The exception:
Exception in thread "main" org.apache.kafka.connect.errors.DataException: Mismatched names: name already added to SchemaBuilder (org.apache.kafka.connect.data.Decimal) differs from name in source schema (cedingrate)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1969)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1669)
at io.confluent.connect.avro.AvroData.toConnectSchemaWithCycles(AvroData.java:2000)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1836)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1669)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1803)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1645)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1326)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1307)
at com.tryg.data.ingestors.guidewire.factories.SchemaTest.lambda$main$0(SchemaTest.java:103)
at io.vavr.Value.forEach(Value.java:340)
at com.tryg.data.ingestors.guidewire.factories.SchemaTest.main(SchemaTest.java:102)
and avro schema generated by AvroParquetReader
is (fragment):
"type": "record",
"name": "spark_schema",
"fields": [
{
"name": "cedingrate",
"type": [
"null",
{
"type": "fixed",
***"name": "cedingrate",
"size": 16,
"logicalType": "decimal",
"precision": 38,
"scale": 6
}
],
"default": null
},
...
I have debugged into the AvroData
code and found out that the issue is that AvroData
expects "name": "org.apache.kafka.connect.data.Decimal"
instead of "name": "cedingrate"
in the place marked with ***
.
Cedingrate is the field name in parquet file. The parquet files are being generated by Guidewire cloud data access framework.
I dont know where to look for further.
Any tips apreciated.
Regards, Dawid.
TLDR; use io.confluent.connect.avro.AvroData
class from kafka-connect-avro-data:6.2.6 dependency to convert avro to connect data.
The above solution will work with io.confluent.connect.avro.AvroData
class which is delivered in kafka-connect-avro-data (maven dependency) with version up to 6.2.6.
In version 7.0.0, avro fixed schemas, which are being converted to connect schemas by io.confluent.connect.avro.AvroData
, are treated as schemas with type FIXED
. Up to 7.0.0, they are treated as BYTE type schemas.
There is some kind of incompatibility between parquet-avro dependency (the way how AvroParquetReader builds fixed schemas from parquet files) and kafka-connect-avro-data (the way how AvroData converts schemas from avro to connect). I wasn't able to find out which of those components work according to avro schema requirements.