I don't have any experience with kafka and related technology, so I need your help.
I am facing following problem:
In my current project we use confluent platform (kafka + schema registry). For serialization/deserialization we decided to use AVRO.
I created springboot application which behaves as producer and also consumer of kafka messages.
I have a problem, when i try to send kafka message which contains field shippedAt
... In the schema it is defined as a optional field (default: null)
When I try to send message into kafka, producer throws an exception: org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2023-12-18T10:40:01.594Z
Do you have similar experience? Or is there a better way how to work with timestamps?
Schema for message (field is mandatory):
{
"namespace": "sk.kubbo.eventhubpoc.avro",
"type": "record",
"name": "Order",
"fields": [
{
"name": "orderId",
"type": "string"
},
{
"name": "product",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "shippedAt",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
Full stacktrace:
org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant: 2023-12-18T10:40:01.594Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:947) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:440) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:906) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:307) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:157) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:108) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:92) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:159) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95) ~[avro-1.11.3.jar:1.11.3]
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) ~[avro-1.11.3.jar:1.11.3]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:192) ~[kafka-avro-serializer-7.5.1.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:162) ~[kafka-avro-serializer-7.5.1.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:68) ~[kafka-avro-serializer-7.5.1.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000) ~[kafka-clients-3.6.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947) ~[kafka-clients-3.6.0.jar:na]
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050) ~[spring-kafka-3.1.0.jar:3.1.0]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:787) ~[spring-kafka-3.1.0.jar:3.1.0]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:756) ~[spring-kafka-3.1.0.jar:3.1.0]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:545) ~[spring-kafka-3.1.0.jar:3.1.0]
at sk.kubbo.eventhubpoc.producer.EventProducer.sendMessage(EventProducer.java:37) ~[classes/:na]
at sk.kubbo.eventhubpoc.producer.EventProducer.createOrderEvent(EventProducer.java:32) ~[classes/:na]
at sk.kubbo.eventhubpoc.controller.ProducerController.produceOrder(ProducerController.java:24) ~[classes/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:254) ~[spring-web-6.1.1.jar:6.1.1]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:182) ~[spring-web-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:917) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:829) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.1.1.jar:6.1.1]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.1.1.jar:6.1.1]
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:590) ~[tomcat-embed-core-10.1.16.jar:6.0]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.1.1.jar:6.1.1]
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.16.jar:6.0]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.zalando.logbook.servlet.LogbookFilter.doFilter(LogbookFilter.java:76) ~[logbook-servlet-3.7.2.jar:na]
at org.zalando.logbook.servlet.HttpFilter.doFilter(HttpFilter.java:32) ~[logbook-servlet-3.7.2.jar:na]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.1.1.jar:6.1.1]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.1.1.jar:6.1.1]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.1.1.jar:6.1.1]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.1.1.jar:6.1.1]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.1.1.jar:6.1.1]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:340) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:391) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1744) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-10.1.16.jar:10.1.16]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
It works correctly, if value of shippedAt
is null
. Also when i modify schema and make the field shippedAt
mandatory (see snippet bellow), it works correctly.
{
"namespace": "sk.kubbo.eventhubpoc.avro",
"type": "record",
"name": "Order",
"fields": [
{
"name": "orderId",
"type": "string"
},
{
"name": "product",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "shippedAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}
For generating java classes I use maven plugin org.apache.avro:avro-maven-plugin:1.11.3
and my settings looks like this:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro/event</sourceDirectory>
<createSetters>false</createSetters>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<fieldVisibility>private</fieldVisibility>
</configuration>
</plugin>
I noticed that when I generate java class from schema2 (field is mandatory) in the generated class there is method getConversion (this is missing in the generated class when field is optional)
private static final org.apache.avro.Conversion<?>[] conversions =
new org.apache.avro.Conversion<?>[] {
null,
null,
null,
new org.apache.avro.data.TimeConversions.TimestampMillisConversion(),
null
};
@Override
public org.apache.avro.Conversion<?> getConversion(int field) {
return conversions[field];
}
It looks that it can be caused by having multiple event types in one topic. I implemented it like in the article https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/
When I use my order
class in topic which allows only order
messages, it works.
I am really desperate now, since we need to have multiple event types in one topic, because we have to preserve order of events.
OK, I managed to solve the issue.
I found an information (Writing Avro From Spark to Kafka) that it is a bug in avro dependencies. On this page was also a link to StackOverflow thread where I found the solution
I added following line in the application class and now it works correctly
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());