Search code examples
pythonapache-kafkakafka-pythonkafka-topic

How to programmatically create topics using kafka-python?


I am getting started with Kafka and fairly new to Python. I am using this library named kafka-python to communicate with my Kafka broker. Now I need to dynamically create a topic from my code, from the docs what I see is I can call create_topics() method to do so, however I am not sure, how will I get an instance of this class. I am not able to understand this from the docs.

Can some one help me with this?


Solution

  • You first need to create an instance of KafkaAdminClient. The following should do the trick for you:

    from kafka.admin import KafkaAdminClient, NewTopic
    
    
    admin_client = KafkaAdminClient(
        bootstrap_servers="localhost:9092", 
        client_id='test'
    )
    
    topic_list = [NewTopic(name="example_topic", num_partitions=1, replication_factor=1)]
    admin_client.create_topics(new_topics=topic_list, validate_only=False)
    

    Alternatively, you can use confluent_kafka client which is a lightweight wrapper around librdkafka:

    from confluent_kafka.admin import AdminClient, NewTopic
    
    
    admin_client = AdminClient({"bootstrap_servers": "localhost:9092"})
    topic_list = [NewTopic("example_topic", 1, 1)]
    admin_client.create_topics(topic_list)