Search code examples
pythonapache-kafkaavrokafka-producer-api

Avro Producer sending key without key schema


I am using Avro Producer in Python 2.7. I need to send a message with a key and value, the value has Avro-Schema in the topic, but there is not Avro-Schema for the key (I can't add Schema for the key - legacy reasons).

This is my Code:

def main():
    kafkaBrokers = os.environ.get('KAFKA_BROKERS')
    schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
    topic = os.environ.get('KAFKA_TOPIC')

    subject = '${}-value'.format(topic)
    sr = CachedSchemaRegistryClient(schemaRegistry)

    schema = sr.get_latest_schema(subject).schema

    value_schema = avro.loads(str(schema))

    url = 'test.com'

    value = {'url': u'test.com', 'priority': 10}

    avroProducer = AvroProducer({
        'bootstrap.servers': kafkaBrokers,
        'schema.registry.url': schemaRegistry
    }, default_value_schema=value_schema)


    key = 1638895406382020875
    
    avroProducer.produce(topic=topic, value=value, key=key)
    avroProducer.flush()

I get the following error:

raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key

If I delete the key from the produce function:

avroProducer.produce(topic=topic, value=value)

It works.

How is it possible to send the key without having schema?


Solution

  • You'll need to use regular Producer and execute the serialization functions yourself

    from confluent_kafka import avro
    from confluent_kafka.avro import CachedSchemaRegistryClient
    from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
    
    avro_serializer = AvroSerializer(schema_registry)
    serialize_avro = avro_serializer.encode_record_with_schema  # extract function definition 
    
    value_schema = avro.load('avro_schemas/value.avsc')  # TODO: Create avro_schemas folder 
    
    p = Producer({'bootstrap.servers': bootstrap_servers})
    
    value_payload = serialize_avro(topic, value_schema, value, is_key=False)
    p.produce(topic, key=key, value=value_payload, callback=delivery_report)