Search code examples
apache-kafkaconfluent-platformksqldb

Why is ksqldb-server crashing when upgrading from v0.6.0 to v0.8.1?


I have an existing ksqldb-server that I want to upgrade from v0.6.0 with connect as a separated node, to v0.8.1 with embedded connect. However, ksqldb-server crashes after starting with the following log:

ksqldb-server      | [2020-04-07 07:24:37,266] ERROR Error during restore (io.confluent.ksql.rest.server.computation.CommandRunner:191)
ksqldb-server      | org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
ksqldb-server      | Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])
ksqldb-server      |  at [Source: (byte[])"{"statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","useOffsetAsQueryID":true,"streamsProperties":{},"originalProperties":{"ksql.exten"[truncated 1813 bytes]; line: 1, column: 2314] (through reference chain: io.confluent.ksql.rest.server.computation.Command["useOffsetAsQueryID"])
ksqldb-server      | Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])
ksqldb-server      |  at [Source: (byte[])"{"statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","useOffsetAsQueryID":true,"streamsProperties":{},"originalProperties":{"ksql.exten"[truncated 1813 bytes]; line: 1, column: 2314] (through reference chain: io.confluent.ksql.rest.server.computation.Command["useOffsetAsQueryID"])
ksqldb-server      |    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
ksqldb-server      |    at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1206)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:504)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
ksqldb-server      |    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
ksqldb-server      |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
ksqldb-server      |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3266)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.InternalTopicSerdes$InternalTopicDeserializer.deserialize(InternalTopicSerdes.java:59)
ksqldb-server      |    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1309)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1540)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1376)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676)
ksqldb-server      |    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
ksqldb-server      |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
ksqldb-server      |    at io.confluent.ksql.rest.server.CommandTopic.getRestoreCommands(CommandTopic.java:88)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.CommandStore.getRestoreCommands(CommandStore.java:211)
ksqldb-server      |    at io.confluent.ksql.rest.server.computation.CommandRunner.processPriorCommands(CommandRunner.java:155)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.initialize(KsqlRestApplication.java:424)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.startKsql(KsqlRestApplication.java:354)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlRestApplication.startAsync(KsqlRestApplication.java:338)
ksqldb-server      |    at io.confluent.ksql.rest.server.ExecutableServer.startAsync(ExecutableServer.java:47)
ksqldb-server      |    at io.confluent.ksql.rest.server.MultiExecutable.doAction(MultiExecutable.java:63)
ksqldb-server      |    at io.confluent.ksql.rest.server.MultiExecutable.startAsync(Mult iExecutable.java:42)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:75)
ksqldb-server      |    at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:61)
ksqldb-server      | [2020-04-07 07:24:37,286] INFO Server shutting down (io.confluent.ksql.rest.server.KsqlServerMain:79)

The key message here is: Unrecognized field "useOffsetAsQueryID" (class io.confluent.ksql.rest.server.computation.Command), not marked as ignorable (4 known properties: "plan", "originalProperties", "streamsProperties", "statement"])

The only resource I could find is this feature but it was already in v0.6.0: https://github.com/confluentinc/ksql/pull/3343

Any idea? Thanks for your help.


Solution

  • There have been breaking changes from 0.6 to 0.8.1. The error you see is because KSQL 0.8.1 cannot use the 0.6 streams read from the command topic. To fix it, follow these instructions for upgrading: https://github.com/confluentinc/ksql/blob/master/docs/operate-and-deploy/installation/upgrading.md

    It would be easier if you can go back to your 0.6 version to get all streams/tables/types from KSQL, then re-create them in 0.8.1.