Search code examples
apache-flinkavro

How to use Avro Generated Classes with children as UNION in Flink Operator


I am trying to build a list of Avro Generated Records while iterating with a .map() over Kafka Records that are getting pulled from a Kafka Source.

The problem I'm having is that I have to work with multiple types of events on that Kafka Topic, so I ended up having a GenricType (schema/avro generated object) that has an UNION on a field ('data').

While processing those records and trying to build the result, I've debugged and ended up in the PojoType validation phase, and since the class GenericType has a child declared as an UNION, that field becomes: private java.lang.Object data;

While processing this field, in the PojoType validator, it throws an exception:

Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo

My GenericType java class (that gets generated, does extends explicitly com.avro.specific.SpecificRecordBase but the problem still remains, because the field from it, is of type java.lang.Object.

Here is the code that is causing issues:

SingleOutputStreamOperator<GenericType> producedRecords =
      kafkaRecords
          .map(
              value -> {
                String kafkaKey = value.get(KEY).asText();
                String kafkaRecordJson = MAPPER.writeValueAsString(value.get(VALUE));
                return (GenericType) Converter.convert(kafkaKey, kafkaRecordJson);
              })
          .returns(
              TypeInformation.of(
                  new TypeHint<>() {
                    @Override
                    public TypeInformation<GenericType> getTypeInfo() {
                      return super.getTypeInfo();
                    }
          }));

Avro Schemas:

{
  "type": "record",
  "name": "GenericType",
  "namespace": "com.zzz.yyy",
  "fields": [
    {
      "name": "data",
      "type": [
        "com.zzz.yyy.Schema1",
        "com.zzz.yyy.Schema2"
      ]
    }
  ]
}

I have also tried with an Avro schema looking like this:

[
    "com.zzz.yyy.Schema1",
    "com.zzz.yyy.Schema2"
]

So that is just an UNION for the generic type object, but I can't make the avro plugin that generates the object to actually work. Always stating that the schema is invalid.

This schema will generate an java object looking like this (Obviously cleared out the boilerplate code that avro adds) - worth mentioning that this class below, does extend SpecificRecordBase - just to exclude this being the problem for the exception.

public class GenericType {
    // boiler plate here
    private java.lang.Object data;
    // boiler plate here
}

And this is the actual problem, while debugging like I said, while verifying the fields from the object, the 'data' field, is not fine, because it's not a primitive or a POJO Type (being an object), it does not respect some of the rules (having to have a no args constructor, getters, setters etc)

Trying to figure out how could I generate those Avro Object, or what could I use instead of the generic one inside my job, so that I can move on past that exception - as honestly by observing that validation there, I'm not sure how this would be possible, since also, the Avro plugin will always generate a field as a java.lang.Object for an UNION.

More Context:

  • Avro schemas registered with schema-registry.
  • Produced Avro objects sent to a kafka sink.

Solution

  • It was just a silly problem, the mvn plugin used to generate the avro classes had the set a flag to not create setters. After adding that, all validation passed on the Avro Pojos, so the flow was succesfull.