Search code examples
apache-kafkaconfluent-platformksqldb

ksqlDB server fails to start when connecting to existing kafka broker


I have an existing kafka broker at <myurl>:9092. That broker is running Apache Kafka version 2.2.0. I would like to use ksqlDB to do some stream processing on data from topics on that kafka broker. Thus, I am using ksqlDB from Confluent Platform version 5.2, according to the compatibility table at https://docs.confluent.io/platform/current/installation/versions-interoperability.html.

I've set bootstrap.servers=<myurl>:9092 in ksql/ksql-server.properties.

However, when I try to start the ksql-server by running ksql-server-start etc/ksql/ksql-server.properties, I get the following error:

ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:53)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
 at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
 at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
...

If I use a local broker and set bootstrap.servers=localhost:9092, ksql-server starts without any problems.

How do I get around this null record/deserialization issue in order to connect a ksqldb server to my existing kafka broker?


Solution

  • As @OneCricketeer pointed out, the problem is arising from an existing command topic on the cluster. You can use a fresh command topic by changing the ksql.service.id in the ksqlDB server properties. See Configuring ksqlDB server.