Search code examples
apache-pulsarpython-pulsar

Publishing to Topic with registered schema in Apache Pulsar


As shown in the example in Pulsar Schema Registry Docs

Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
    .topic(topic)
    .create();
User user = new User(“Tom”, 28);
producer.send(User);

You can register schema for both Producer and Consumer with Java Client. It is also mentioned that clients in other languages do not support schema registry.

Now is it possible to send a message from a Python API Producer on a Pulsar topic that will be consumed by a Consumer with Registered Schema? e.g.

processor = PulsarClient.builder()
            .serviceUrl("pulsar://pulsarhost:6650")
            .build()
            .newConsumer(JSONSchema.of(User.class))
            .topic("sometopic")
            .subscriptionName("somesubscription")
            .subscribe();

Python: import pulsar

client = pulsar.Client('pulsar://pulsarhost:6650')

producer = client.create_producer('sometopic')
client.close()

Solution

  • From Pulsar 2.4 release, you can declare the schema in Python as well, both when publishing and consuming.

    Given the dynamic nature of Python object, we have defined a Record class that you can use to explicitly declare the schema format. For example:

    import pulsar
    from pulsar.schema import *
    
    class Example(Record):
        a = String()
        b = Integer()
        c = Boolean()
    
    
    client = pulsar.Client('pulsar://localhost:6650')
    producer = client.create_producer(
                        topic='my-topic',
                        schema=AvroSchema(Example) )
    
    producer.send(Example(a='Hello', b=1))
    

    More examples on the Python client docs: https://pulsar.apache.org/docs/en/client-libraries-python/#declaring-and-validating-schema