I'm new to Storm and Kafka, and I could manage to install both on a local virtual machine after some time. I currently have a working wordCount topology taking sentences from a dropBox text file:
public void nextTuple() {
final String APP_KEY = "XXXX";
final String APP_SECRET = "XXXX";
DbxAppInfo appInfo = new DbxAppInfo(APP_KEY, APP_SECRET);
DbxRequestConfig config = new DbxRequestConfig("StormTopology/1.0", Locale.getDefault().toString());
String accessToken = "XXXXXXXX";
DbxClient client = new DbxClient(config, accessToken);
String sentence="";
try {FileOutputStream outputStream = new FileOutputStream("fromSpout.txt");
try {
//client.delete("/*.txt");
DbxEntry.File downloadedFile = client.getFile("/spout.txt", null,outputStream);
sentence= readFile("fromSpout.txt");
if (sentence==null || sentence == "" || sentence == " " || sentence == "/t") {
Utils.sleep(1000);
return;
}
}
catch (DbxException ex) { }
catch (IOException ex) { }
//return 1;
finally {
outputStream.close();
}
}
catch (FileNotFoundException ex){}
catch (IOException ex) {}
if (sentence.length()<2) { Utils.sleep(10000); return; }
try { client.delete("/spout.txt");}
catch (DbxException ex) { }
_collector.emit(new Values(sentence));
Utils.sleep(1000);
Now I want to upgrade my spout to use text from Kafka in order to submit to my next bolt in the topology. I tried to follow many articles and codes in git without any success. For example: this kafka spout. Could anyone please help and give me some direction in order to implement the new spout.java file? Thank you!
From storm 0.9.2 release, there is an external storm-kafka package which can do this. Actually this package is contributed back to storm community from storm-kafka-0.8-plus. And there is a test project showing its usage.
In details, first add dependency to your maven (or lein/gradle):
groupId: org.apache.storm
artifactId: storm-kafka
version: 0.9.2-incubating
Then create topology and spout like this:
import storm.kafka
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);