Search code examples
javaapache-kafkaapache-zookeeperbroker

How do I get connection string for brokers in Kafka


I am new to Kafka. Tried to implement consumer and producer classes to send and receive messages. Need to configure bootstrap.servers for both classes which is a list of broker's ip and port separated by ,. For example,

producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

Since the application will be running on the master node of a cluster, it should be able to retrieve the broker information from ZooKeeper just like the answer to Kafka: Get broker host from ZooKeeper.

public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
        String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
        System.out.println(id + ": " + brokerInfo);
    }
}

However this brokerInfo is in Json format which looks like this: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}

In this same post, another one suggested the following way of getting connection string for each broker and join them together with comma.

for (String id : ids) {
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) {
            brokerList.add(broker.connectionString());
        }
    }

If this Broker class is from org.apache.kafka.common.requests.UpdateMetadataRequest.Broker, it does not have methods createBroker and connectionString.

Found another similar post Getting the list of Brokers Dynamically. But it did not say how to get the attribute from broker info such as host and port. I can probably write a parser for the json like string to extract them, but I suspect there is more Kafka native way to do that. Any suggestions?

EDIT: I realized the Broker class is from kafka.cluster.Broker. Still it does not have method connectionstring().


Solution

  • You could use ZkUtils to retrieve all the broker information in the cluster, as show below:

    ZkUtils zk = ZkUtils.apply("zkHost:2181", 6000, 6000, true);
    List<Broker> brokers = JavaConversions.seqAsJavaList(zk.getAllBrokersInCluster());
    for (Broker broker : brokers) {  
        //assuming you do not enable security    
        System.out.println(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host());
    }
    zk.close();