Search code examples
pythonapache-kafkakafka-producer-apikafka-python

How to produce a Tombstone Avro Record in Kafka using Python?


my sink properties :

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

my connect-avro-distributed.properties :

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

I send data like this:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['10.0.0.0:9092'],
)
message=producer.send('orders', key=b'{"id":1}', value=None)

But it gives error. Serialization error.


Solution

  • I assume you want to produce Avro message therefore you need to serialise your messages properly. I'll be using confluent-kafka-python library so if you don't already have it installed, just run

    pip install confluent-kafka[avro]
    

    And here's an example AvroConsumer that sends an Avro message with a null value:

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer
    
    
    value_schema_str = """
    {
       "type":"record",
       "name":"myrecord",
       "fields":[
          {
             "name":"id",
             "type":[
                "null",
                "int"
             ],
             "default":null
          },
          {
             "name":"product",
             "type":[
                "null",
                "string"
             ],
             "default":null
          },
          {
             "name":"quantity",
             "type":[
                "null",
                "int"
             ],
             "default":null
          },
          {
             "name":"price",
             "type":[
                "null",
                "int"
             ],
             "default":null
          }
       ]
    }
    """
    
    key_schema_str = """
    {
       "type":"record",
       "name":"key_schema",
       "fields":[
          {
             "name":"id",
             "type":"int"
          }
       ]
    }
    """
    
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    
    if __name__ == '__main__':
        value_schema = avro.loads(value_schema_str)
        key_schema = avro.loads(key_schema_str)
        #value = {"id": 1, "product": "myProduct", "quantity": 10, "price": 100}
        key = {"id": 1}
    
    
        avroProducer = AvroProducer({
            'bootstrap.servers': '10.0.0.0:9092',
            'on_delivery': delivery_report,
            'schema.registry.url': 'http://10.0.0.0:8081'
        }, default_key_schema=key_schema, default_value_schema=value_schema)
    
        avroProducer.produce(topic='orders', key=key)
        avroProducer.flush()