Search code examples

Convert Storm - wordCount topology to use a Kafka Spout

I'm new to Storm and Kafka, and I could manage to install both on a local virtual machine after some time. I currently have a working wordCount topology taking sentences from a dropBox text file:

public void nextTuple() {

 final String APP_KEY = "XXXX";
final String APP_SECRET = "XXXX";
DbxAppInfo appInfo = new DbxAppInfo(APP_KEY, APP_SECRET);
DbxRequestConfig config = new DbxRequestConfig("StormTopology/1.0", Locale.getDefault().toString());
String accessToken = "XXXXXXXX";
DbxClient client = new DbxClient(config, accessToken);
String sentence="";
try {FileOutputStream outputStream = new FileOutputStream("fromSpout.txt"); 
try {
   DbxEntry.File downloadedFile = client.getFile("/spout.txt", null,outputStream);

   sentence= readFile("fromSpout.txt");          
   if (sentence==null || sentence == "" || sentence == " " || sentence == "/t") {
catch (DbxException ex) {  } 
catch (IOException ex) { }       
        //return 1;
finally {
catch (FileNotFoundException ex){}
catch (IOException ex) {}       

if (sentence.length()<2) {  Utils.sleep(10000);  return; }
try { client.delete("/spout.txt");}
 catch (DbxException ex) {  } 
_collector.emit(new Values(sentence)); 

Now I want to upgrade my spout to use text from Kafka in order to submit to my next bolt in the topology. I tried to follow many articles and codes in git without any success. For example: this kafka spout. Could anyone please help and give me some direction in order to implement the new file? Thank you!


  • From storm 0.9.2 release, there is an external storm-kafka package which can do this. Actually this package is contributed back to storm community from storm-kafka-0.8-plus. And there is a test project showing its usage.

    In details, first add dependency to your maven (or lein/gradle):

    groupId: org.apache.storm
    artifactId: storm-kafka
    version: 0.9.2-incubating

    Then create topology and spout like this:

    import storm.kafka
    TridentTopology topology = new TridentTopology();
    BrokerHosts zk = new ZkHosts("localhost");
    TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);