Search code examples
javaweb-crawlerapache-tikastormcrawler

Storm Crawler Configuration with Tika for recursive crawls


I want to include tika parser to my topology. I have set jsoup.treat.non.html.as.error as false in the config and I have set up the tika topology as described in the storm crawler documentation.

The settings of the crawl topology are as follows:

builder.setSpout("spout", new MemorySpout(testURLs));

builder.setBolt("partitioner", new URLPartitionerBolt()).shuffleGrouping("spout");

builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping("partitioner", new Fields("key"));

builder.setBolt("sitemap", new SiteMapParserBolt()).localOrShuffleGrouping("fetch");

builder.setBolt("jsoup", new JSoupParserBolt()).localOrShuffleGrouping("sitemap");

builder.setBolt("shunt", new RedirectionBolt()).localOrShuffleGrouping("jsoup");

builder.setBolt("tika", new ParserBolt()).localOrShuffleGrouping("shunt", "tika");

builder.setBolt("indexer", new HBaseIndexerBolt(), numWorkers).localOrShuffleGrouping("shunt")
                    .localOrShuffleGrouping("tika");

builder.setBolt("status", new MemoryStatusUpdater()).localOrShuffleGrouping(Constants.StatusStreamName)
                    .localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
                    .localOrShuffleGrouping("shunt", Constants.StatusStreamName)
                    .localOrShuffleGrouping("tika", Constants.StatusStreamName)
                    .localOrShuffleGrouping("indexer", Constants.StatusStreamName);

return submit("crawl", conf, builder);

With this topology, I have received Invalid Topology exception. The problem seems to be caused by the status bolt. Because, when I excluded the status bolt, my crawl topology works without any problem. How should I configure status bolt?


Solution

  • The first connection is missing 'fetch'and you should connect 'jsoup' not 'shunt', the latter does not emit outlinks to the status stream : it simply sends tuples which JSoup could not handle to a specific stream for Tika to use. See StatusStream wiki for some background.

    The definition below should work.

    builder.setBolt("status", new MemoryStatusUpdater()).
        .localOrShuffleGrouping("fetch", Constants.StatusStreamName)
        .localOrShuffleGrouping("sitemap", Constants.StatusStreamName)
        .localOrShuffleGrouping("jsoup", Constants.StatusStreamName)
        .localOrShuffleGrouping("tika", Constants.StatusStreamName)
        .localOrShuffleGrouping("indexer", Constants.StatusStreamName);
    

    This is assuming that your HBaseIndexer extends AbstractIndexerBolt and sends tuples to the status stream.

    Please note that the MemoryStatusUpdater is mostly for testing and debugging: it won't necessarily work if you have more than one worker and will lose its data if the topology if the worker process is restarted.