Search code examples
javagradleuser-defined-functionsksqldbuser-defined-aggregate

KSQLDB: write custom UDAF with struct type (annotationparser exception)


Im trying to write a custom UDAF for KSQLDB.

First i wanted to try out the example: https://docs.ksqldb.io/en/latest/how-to-guides/create-a-user-defined-function/#implement-the-class

The Schemas of the structs will be submited via the Schemas_describtor. "This communicates the underlying types to ksqlDB in a way that its type system can understand." F.E

 @UdafFactory(description = "describtonOfFunction",
                   paramSchema = PARAM_SCHEMA_DESCRIPTOR,
                   aggregateSchema = AGGREGATE_SCHEMA_DESCRIPTOR,
                   returnSchema = RETURN_SCHEMA_DESCRIPTOR)`

When i build the shadowJar and submit it to ksqldbs extension_dir without the Schemas(param,aggregate and return), KSQLDB is starting up with no errors and the function is showing up. I need to use structs, so i need to submit a schema to ksqldb. When i add the SchemaDescriptor to the UDAF Factory, i get a null pointer exception from Java and an annotationParser exception. SchemaBuilder and descriptor, gradle and logs below:

    public static final Schema PARAM_SCHEMA = SchemaBuilder.struct().optional()
    .field("C", Schema.OPTIONAL_INT64_SCHEMA)
    .build();

public static final String PARAM_SCHEMA_DESCRIPTOR = "STRUCT<" +
    "C BIGINT" +
    ">";

public static final Schema AGGREGATE_SCHEMA = SchemaBuilder.struct().optional()
    .field("MIN", Schema.OPTIONAL_INT64_SCHEMA)
    .field("MAX", Schema.OPTIONAL_INT64_SCHEMA)
    .field("COUNT", Schema.OPTIONAL_INT64_SCHEMA)
    .build();

public static final String AGGREGATE_SCHEMA_DESCRIPTOR = "STRUCT<" +
    "MIN BIGINT," +
    "MAX BIGINT," +
    "COUNT BIGINT" +
    ">";

public static final Schema RETURN_SCHEMA = SchemaBuilder.struct().optional()
    .field("MIN", Schema.OPTIONAL_INT64_SCHEMA)
    .field("MAX", Schema.OPTIONAL_INT64_SCHEMA)
    .field("COUNT", Schema.OPTIONAL_INT64_SCHEMA)
    .field("DIFFERENTIAL", Schema.OPTIONAL_INT64_SCHEMA)
    .build();

public static final String RETURN_SCHEMA_DESCRIPTOR = "STRUCT<" +
    "MIN BIGINT," +
    "MAX BIGINT," +
    "COUNT BIGINT," +
    "DIFFERENTIAL BIGINT" +
    ">";

(I just copied the class from the example)

Error Log of KSQLDB:

    `java.lang.NullPointerException
    at java.base/sun.reflect.annotation.AnnotationParser.parseArray(AnnotationParser.java:533)
   at java.base/sun.reflect.annotation.AnnotationParser.parseMemberValue(AnnotationParser.java:356)
    at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotation2(AnnotationParser.java:287)
    at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotations2(AnnotationParser.java:121)
    at java.base/sun.reflect.annotation.AnnotationParser.parseAnnotations(AnnotationParser.java:73)
    at java.base/java.lang.reflect.Executable.declaredAnnotations(Executable.java:604)
     at java.base/java.lang.reflect.Executable.declaredAnnotations(Executable.java:602)
    at java.base/java.lang.reflect.Executable.getAnnotation(Executable.java:572)
     at java.base/java.lang.reflect.Method.getAnnotation(Method.java:695)
     at io.confluent.ksql.function.UdafLoader.loadUdafFromClass(UdafLoader.java:59)
    at io.confluent.ksql.function.UserFunctionLoader.loadFunctions(UserFunctionLoader.java:124)
    at io.confluent.ksql.function.UserFunctionLoader.lambda$load$2(UserFunctionLoader.java:97)
     at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
    at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
     at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
     at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
     at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
     at io.confluent.ksql.function.UserFunctionLoader.load(UserFunctionLoader.java:96)
     at io.confluent.ksql.rest.server.KsqlServerMain.loadFunctions(KsqlServerMain.java:133)
     at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:80)`

My Gradle Build file has the following dependecies:

dependencies {
implementation "io.confluent.ksql:ksqldb-udf:7.3.0"
implementation "org.apache.kafka:kafka_2.13:2.5.0"
implementation "org.apache.kafka:connect-api:3.3.1"}

I really dont know why KSQLDB cant parse the Schema (Its a provided example class from KSQLDB)...


Solution

  • I am able to submit the UDAF with param,aggregate and return Schema without an annotationParser Exception after downgrading from:

        implementation "io.confluent.ksql:ksqldb-udf:7.3.0"
    

    to:

        implementation "io.confluent.ksql:ksqldb-udf:5.5.1"