Search code examples
pythonapache-kafkaavroapache-kafka-connectkafka-producer-api

Deserializing Avro message


I deployed Kafka from here. Also I added to docker-compose.yml Postgres container like this:

postgres:
    image: postgres
    hostname: kafka-postgres
    container_name: kafka-postgres
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    ports:
      - 5432:5432

Created a topic pageviews.

Further I created DatagenConnector with settings and ran it.

{
  "name": "datagen-pageviews",
  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "kafka.topic": "pageviews",
  "max.interval": "100",
  "iterations": "999999999",
  "quickstart": "pageviews"
} 

As far as I can see the connector defined a schema for the topic:

{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "pageid",
      "type": "string"
    }
  ],
  "connect.name": "ksql.pageviews"
} 

My next step was to create JdbcSinkConnector that would transfer data from Kafka topic to Postgres table. That worked. The settings of connector:

{
  "name": "from-kafka-to-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "pageviews"
  ],
  "connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
  "connection.user": "postgres",
  "connection.password": "********",
  "auto.create": "true",
  "auto.evolve": "true"
}

Then I try to send messages to that topic by myself. But failed with error:

[2020-02-01 21:16:11,750] ERROR Error encountered in task to-pg-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='pageviews', partition=0, offset=23834, timestamp=1580591160374, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pageviews to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

So the send method matters. This is how I do this (Python, confluent-kafka-python):

producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
   'viewtime': 123,
   'userid': 'user_1',
   'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()

Maybe should I provide a schema with message (AvroProducer)?


Solution

  • The topic expects a message in Avro type.

    AvroProducer from confluent-kafka-python does the trick:

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
       "namespace": "ksql",
       "name": "value",
       "type": "record",
       "fields" : [
         {
           "name" : "viewtime",
           "type" : "long"
         }, 
         {
           "name" : "userid",
           "type" : "string"
         }, 
         {
           "name" : "pageid",
           "type" : "string"
         }
       ]
    }
    """
    
    key_schema_str = """
    {
       "namespace": "ksql",
       "name": "key",
       "type": "record",
       "fields" : [
         {
           "name" : "pageid",
           "type" : "string"
         }
       ]
    }
    """
    
    value_schema = avro.loads(value_schema_str)
    key_schema = avro.loads(key_schema_str)
    value = {"name": "Value"}
    key = {"name": "Key"}
    
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    
    avroProducer = AvroProducer({
        'bootstrap.servers': 'mybroker,mybroker2',
        'on_delivery': delivery_report,
        'schema.registry.url': 'http://schema_registry_host:port'
        }, default_key_schema=key_schema, default_value_schema=value_schema)
    
    avroProducer.produce(topic='my_topic', value=value, key=key)
    avroProducer.flush()