I want to include tika parser to my topology. I have set jsoup.treat.non.html.as.error
as false
in the config and I have set up the tika topology as described in the storm crawler documentation.
The settings of the crawl topology are as follows:
builder.setSpout("spout", new MemorySpout(testURLs));
builder.setBolt("partitioner", new URLPartitionerBolt()).shuffleGrouping("spout");
builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping("partitioner", new Fields("key"));
builder.setBolt("sitemap", new SiteMapParserBolt()).localOrShuffleGrouping("fetch");
builder.setBolt("jsoup", new JSoupParserBolt()).localOrShuffleGrouping("sitemap");
builder.setBolt("shunt", new RedirectionBolt()).localOrShuffleGrouping("jsoup");
builder.setBolt("tika", new ParserBolt()).localOrShuffleGrouping("shunt", "tika");
builder.setBolt("indexer", new HBaseIndexerBolt(), numWorkers).localOrShuffleGrouping("shunt")
.localOrShuffleGrouping("tika");
builder.setBolt("status", new MemoryStatusUpdater()).localOrShuffleGrouping(Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("shunt", Constants.StatusStreamName)
.localOrShuffleGrouping("tika", Constants.StatusStreamName)
.localOrShuffleGrouping("indexer", Constants.StatusStreamName);
return submit("crawl", conf, builder);
With this topology, I have received Invalid Topology exception. The problem seems to be caused by the status bolt. Because, when I excluded the status bolt, my crawl topology works without any problem. How should I configure status bolt?
The first connection is missing 'fetch'and you should connect 'jsoup' not 'shunt', the latter does not emit outlinks to the status stream : it simply sends tuples which JSoup could not handle to a specific stream for Tika to use. See StatusStream wiki for some background.
The definition below should work.
builder.setBolt("status", new MemoryStatusUpdater()).
.localOrShuffleGrouping("fetch", Constants.StatusStreamName)
.localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
.localOrShuffleGrouping("jsoup", Constants.StatusStreamName)
.localOrShuffleGrouping("tika", Constants.StatusStreamName)
.localOrShuffleGrouping("indexer", Constants.StatusStreamName);
This is assuming that your HBaseIndexer extends AbstractIndexerBolt and sends tuples to the status stream.
Please note that the MemoryStatusUpdater is mostly for testing and debugging: it won't necessarily work if you have more than one worker and will lose its data if the topology if the worker process is restarted.