Search code examples
sarama

Creating Kafka topic in sarama


Is it possible to create kafka topic in sarama? I know java API enables you do create topic but I couldn't find any information on how to do that in sarama. if it's possible, an example or explanation on which api I should use would be great thanks in advance


Solution

  • EDIT : Below was an old answer which still works, but at that point the sarama admin apis were under development. Since then ClusterAdmin apis have come a long way and today should be treated as a preferred way to solve this problem. Refer to the other 2 answers below if you are looking to solve this in 2020+.


    It is possible to use sarama for managing Topics in Kafka. I am writing a terraform provider for managing Kafka topics and use sarama to do heavy lifting in the backend.

    You need to use the sarama.Broker apis to do this. For example

    // Set broker configuration
    broker := sarama.NewBroker("localhost:9092")
    
    // Additional configurations. Check sarama doc for more info
    config := sarama.NewConfig()
    config.Version = sarama.V1_0_0_0
    
    // Open broker connection with configs defined above
    broker.Open(config)
    
    // check if the connection was OK
    connected, err := broker.Connected()
    if err != nil {
        log.Print(err.Error())
    }
    log.Print(connected)
    
    // Setup the Topic details in CreateTopicRequest struct
    topic := "blah25s"
    topicDetail := &sarama.TopicDetail{}
    topicDetail.NumPartitions = int32(1)
    topicDetail.ReplicationFactor = int16(1)
    topicDetail.ConfigEntries = make(map[string]*string)
    
    topicDetails := make(map[string]*sarama.TopicDetail)
    topicDetails[topic] = topicDetail
    
    request := sarama.CreateTopicsRequest{
        Timeout:      time.Second * 15,
        TopicDetails: topicDetails,
    }
    
    // Send request to Broker
    response, err := broker.CreateTopics(&request)
    
    // handle errors if any
    if err != nil {
        log.Printf("%#v", &err)
    }
    t := response.TopicErrors
    for key, val := range t {
        log.Printf("Key is %s", key)
        log.Printf("Value is %#v", val.Err.Error())
        log.Printf("Value3 is %#v", val.ErrMsg)
    }
    log.Printf("the response is %#v", response)
    
    // close connection to broker
    broker.Close()
    

    You can have a look at a working code at github. Remember to start kafka broker and import all golang dependency before running the code.