Search code examples
apache-kafkaavroapache-kafka-connectconfluent-schema-registry

Kafka-connect to PostgreSQL - org.apache.kafka.connect.errors.DataException: Failed to deserialize topic to Avro


Setup

I've installed latest (7.0.1) version of Confluent platform in standalone mode on Ubuntu virtual machine.

Python producer for Avro format

Using this sample Avro producer to generate stream from data to Kafka topic (pmu214).

Producer seems to work ok. I'll give full code on request. Producer output:

Raw data: {"pmu_id": 2, "time": 1644577001.22, "measurements": [{"stream_id": 2, "stat": "ok", "phasors": [[27.22379, 0.0], [24.672079638784002, -2.075237618663568], [25.11940552135938, 2.10660756475536], [3248.794237867336, -0.06468446412011757], [3068.6629010042793, -2.152472189017548], [2990.0809353594427, 2.031751749658583], [0.0, 0.0], [3101.9477751890026, -0.06193618455080409]], "analog": [], "digital": [0], "frequency": 50.022, "rocof": 0}]}
PMU record b'e5c1e5a8-3e44-465d-98c4-93f896ec1b14' successfully produced to pmu214 [0] at offset 38256

KSQL

In ksqld it seems that record reached ksqldb ok:

rowtime: 2022/02/11 09:48:05.431 Z, key: [26151234-d3dd-4b7c-9222-2867@3486128305426751843/-], value: \x00\x00\x00\x00\x01\x04\xAA\xC3\xB1\xA0\x0C\x04\x04ok\xC2p\xD9A\x0F`\x
9F>.b\xC6A\xB9\xC8\xE2\xBF\xC5%\xC8A\x13v\x1A@\x8FVKE\xF9\xF8u>\xC5\xC5?E\xEA\xBA\xEC\xBF\xE0\xFA:E\xD5~\x15@\x00\x00\x00\x00\x00\x00\x00\x00\x84\x07BEs\xD1w>\x04[]\x020b\x0
2, partition: 0
Index 0 out of bounds for length 0
Topic printing ceased

Kafka-connect

Here is command used to connect to PostgreSQL:

bin/connect-standalone etc/kafka/connect-standalone.properties etc/schema-registry/connect-avro-standalone.properties

Here is content of connect-avro-standalone.properties:

bootstrap.servers=localhost:9092
name=sinkIRIpostgre
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://localhost:5432/mydb
topics=pmu214

connection.user=mydbuser
connection.password=mypass
auto.create=true
auto.evolve=true
insert.mode=insert
pk.mode=record_key
pk.fields=MESSAGE_KEY

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

I didn't change anything in connect-standalone.properties except plugins I've installed.

plugin.path=/home/proton/kafkaConnectors/confluentinc-kafka-connect-jdbc-10.3.2,/home/proton/kafkaConverters/confluentinc-kafka-connect-json-schema-converter-7.0.1,/home/proton/kafkaConverters/confluentinc-kafka-connect-avro-converter-7.0.1/

ERROR received

ERROR [sinkIRIpostgre|task-0] WorkerSinkTask{id=sinkIRIpostgre-0} Error converting message key in topic 'pmu214' partition 0 at offset 0 and timestamp 1644560570372: Failed to deserialize data for topic pmu214 to Avro: (org.apache.kafka.connect.runtime.WorkerSinkTask:552) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pmu214 to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:550) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)


Solution

  • If you literally ran the Python sample code, then the key is not Avro, so a failure on the key.converter would be expected, as shown

    Error converting message key