Search code examples
spring-bootapache-kafkaspring-kafkaavroconfluent-schema-registry

Avro serialization problem in case of optional logicalType


I don't have any experience with kafka and related technology, so I need your help.

I am facing following problem:

In my current project we use confluent platform (kafka + schema registry). For serialization/deserialization we decided to use AVRO.

I created springboot application which behaves as producer and also consumer of kafka messages.

I have a problem, when i try to send kafka message which contains field shippedAt ... In the schema it is defined as a optional field (default: null)

When I try to send message into kafka, producer throws an exception: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2023-12-18T10:40:01.594Z

Do you have similar experience? Or is there a better way how to work with timestamps?

Schema for message (field is mandatory):

{
  "namespace": "sk.kubbo.eventhubpoc.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "product",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "shippedAt",
      "type": [
        "null",
        {
          "type": "long",
          "logicalType": "timestamp-millis"
        }
      ],
      "default": null
    }
  ]
}

Full stacktrace:

org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2023-12-18T10:40:01.594Z
    at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:947) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:440) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:906) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:307) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:157) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:108) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:92) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:159) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) ~[avro-1.11.3.jar:1.11.3]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:192) ~[kafka-avro-serializer-7.5.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:162) ~[kafka-avro-serializer-7.5.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:68) ~[kafka-avro-serializer-7.5.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000) ~[kafka-clients-3.6.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947) ~[kafka-clients-3.6.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050) ~[spring-kafka-3.1.0.jar:3.1.0]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:787) ~[spring-kafka-3.1.0.jar:3.1.0]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:756) ~[spring-kafka-3.1.0.jar:3.1.0]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:545) ~[spring-kafka-3.1.0.jar:3.1.0]
    at sk.kubbo.eventhubpoc.producer.EventProducer.sendMessage(EventProducer.java:37) ~[classes/:na]
    at sk.kubbo.eventhubpoc.producer.EventProducer.createOrderEvent(EventProducer.java:32) ~[classes/:na]
    at sk.kubbo.eventhubpoc.controller.ProducerController.produceOrder(ProducerController.java:24) ~[classes/:na]
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:254) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:182) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:917) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:829) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.16.jar:6.0]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.1.1.jar:6.1.1]
    at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.16.jar:6.0]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.zalando.logbook.servlet.LogbookFilter.doFilter(LogbookFilter.java:76) ~[logbook-servlet-3.7.2.jar:na]
    at org.zalando.logbook.servlet.HttpFilter.doFilter(HttpFilter.java:32) ~[logbook-servlet-3.7.2.jar:na]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.1.1.jar:6.1.1]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:340) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

It works correctly, if value of shippedAt is null. Also when i modify schema and make the field shippedAt mandatory (see snippet bellow), it works correctly.

{
  "namespace": "sk.kubbo.eventhubpoc.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "orderId",
      "type": "string"
    },
    {
      "name": "product",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "shippedAt",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

For generating java classes I use maven plugin org.apache.avro:avro-maven-plugin:1.11.3 and my settings looks like this:

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.11.3</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <sourceDirectory>${project.basedir}/src/main/resources/avro/event</sourceDirectory>
        <createSetters>false</createSetters>
        <enableDecimalLogicalType>true</enableDecimalLogicalType>
        <fieldVisibility>private</fieldVisibility>
    </configuration>
</plugin>

I noticed that when I generate java class from schema2 (field is mandatory) in the generated class there is method getConversion (this is missing in the generated class when field is optional)

  private static final org.apache.avro.Conversion<?>[] conversions =
      new org.apache.avro.Conversion<?>[] {
      null,
      null,
      null,
      new org.apache.avro.data.TimeConversions.TimestampMillisConversion(),
      null
  };

  @Override
  public org.apache.avro.Conversion<?> getConversion(int field) {
    return conversions[field];
  }

It looks that it can be caused by having multiple event types in one topic. I implemented it like in the article https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/ When I use my order class in topic which allows only order messages, it works.

I am really desperate now, since we need to have multiple event types in one topic, because we have to preserve order of events.


Solution

  • OK, I managed to solve the issue.

    I found an information (Writing Avro From Spark to Kafka) that it is a bug in avro dependencies. On this page was also a link to StackOverflow thread where I found the solution

    I added following line in the application class and now it works correctly

    SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());