Search code examples
jdbcapache-kafka-connectavroconfluent-schema-registry

Kafka Connect JDBC Source Connector unable to write write NULL record (tombstone) via SMT


I'm struggling with a jdbc source connector using io.confluent.connect.avro.AvroConverter and a Schema in the registry. I wrote a Custom SMT which allows Connect to return a null value (tombstone) if one of the fields contains a configurable value.

I already wrote a SMT for a Connector using io.confluent.connect.json.JsonSchemaConverter. where the same procedure works fine.

The basic idea is to populate a compacted topic not only with data but also delete old data that is not used anymore.

The Error I get is: org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema

I tried to make the whole Avro schema nullable, I tried to make every element in the schema nullable but nothing of this sort works.

The relevant part of the connect config would be:

  "transforms": "createKey, extractKey, selectValues",
  "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.createKey.fields": "ACCOUNT",
  "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field": "ACCOUNT",
  "transforms.selectValues.type": "my.own.smt.ProduceNullRecordIfFieldEquals",
  "transforms.selectValues.checkField": "ACCOUNT_STATUS",
  "transforms.selectValues.checkContent": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "${env:SCHEMA_REGISTRY_URL}",
  "value.converter.auto.register.schemas": "true",
  "value.converter.compatibility.strict": "false"

The main part of the SMT looks like this:


  public R apply(R record) {
    Struct origStruct = Requirements.requireStruct(record.value(), "flat");
    Struct targetStruct;

    if (String.valueOf(origStruct.get(this.checkField)).equals(this.checkContent)) {
      targetStruct = null;
    } else {
      targetStruct = origStruct;
    }

    return record.newRecord(
        record.topic(),
        record.kafkaPartition(),
        record.keySchema(),
        record.key(),
        record.valueSchema(),
        targetStruct,
        record.timestamp());
  }

And the line of code in the AvroData.java class I have to overcome is here: [Confluent Connect AvroData][1]

if (value == null && schema != null && !schema.isOptional()) {... log error ...}

If I could set the Schema to optional, It should work. But I did not find a way yet.

Any help is apreciated!

BR [1]: https://github.com/confluentinc/schema-registry/blob/master/avro-data/src/main/java/io/confluent/connect/avro/AvroData.java#L1368


Solution

  • I found a solution in the SMT

    It seams as the AvroConverter is stricter than the JsonSchemaConverter.
    In case of null values, the schema provided needs to match the null value and needs to be optional in order to pass the checks in AvroData class.

      public R apply(R record) {
        Struct origStruct = Requirements.requireStruct(record.value(), "flat");
    
        Schema newSchema;
        Struct targetStruct;
    
        if (String.valueOf(origStruct.get(this.checkField)).equals(this.checkContent)) {
          targetStruct = null;
          newSchema = SchemaBuilder.struct().optional().build();
        } else {
          targetStruct = origStruct;
          newSchema = targetStruct.schema();
        }
    
        return record.newRecord(
            record.topic(),
            record.kafkaPartition(),
            record.keySchema(),
            record.key(),
            newSchema,
            targetStruct,
            record.timestamp());
      }
    

    It seems wrong to me as a final solution - but it works for the moment.