Search code examples
apache-storm

How can I see the current output of a running Storm topology?


Currently learning on how to use Storm (version 2.1.0), I am a bit confused on a specific aspect of this data streaming processing (DSP) engine: How is output data handled? Tutorials provide good explanations on system setup and running our first application. Unfortunately, I didn't find a page providing details on results generated by a topology.

With DSP applications, there are no final output because input data is a continuously incoming stream of data (or maybe we can say there is a final output when application is stopped). What I would like is to be able to see the state of current output (the actual output data generated at current time) of a running topology.

I'm able to run WordCountTopology. I understand the output of this topology is generated by the following snippet of code:

public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

My misunderstanding is on the location of the <"word":string, "count":int> output. Is it only in memory, written in a database somewhere, written in a file?

Going further with this question: what are the existing possibilities for storing in-progress output data? What is the "good way" of handling such data?

I hope my question is not too naive. And thanks to the StackOverflow community for always providing good help.


Solution

  • A few days have passed since I posted this question. I am back to share with you what I have tried. Although I cannot tell if it is the right way of doing, the two following propositions answer my question.

    Simple System.out.println()

    The first thing I've tried is to make a System.out.println("Hello World!") directly within the prepare() method of my BaseBasicBolt. This method is called only once at the beginning of each Bolt's thread execution.

    public void prepare(Map topoConf, TopologyContext context) {
      System.out.println("Hello World!");   
    }
    

    The big challenge was to figure out where the log is written. By default, it is written within <storm installation folder>/logs/workers-artifacts/<topology name>/<worker-port>/worker.log where <worker-port> is the port of a requested worker/slot.

    For instance, with conf.setNumWorkers(3), the topology requests an access to 3 workers (3 slots). Therefore, values of <worker-port> will be 6700, 6701 and 6702. Those values are the port numbers of the 3 slots (defined in storm.yaml under supervisor.slots.ports).

    Note: you will have as many "Hello World!" as the parallel size of your BaseBasicBolt. When the split bolt is instantiated with builder.setBolt("split", new SplitSentence(), 8), it results in 8 parallel threads, each one writing its own log.

    Writing to a file

    For research purpose I have to analyse large amounts of logs that I need in a specific format. The solution I found is to append the logs to a specific file managed by each bolt.

    Hereafter is my own implementation of this file logging solution for the count bolt.

    public static class WordCount extends BaseBasicBolt {
        private String workerName;
        private FileWriter fw;
        private BufferedWriter bw;
        private PrintWriter out;
        private String logFile = "/var/log/storm/count.log";
        private Map<String, Integer> counts = new HashMap<String, Integer>();
    
        public void prepare(Map topoConf, TopologyContext context) {
            this.workerName = this.toString();
            try {
                this.fw = new FileWriter(logFile, true);
                this.bw = new BufferedWriter(fw);
                this.out = new PrintWriter(bw);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) {
                count = 0;
            }
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
    
            out.println(this.workerName + ": Hello World!");
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }
    

    In this code, my log file is located in /var/log/storm/count.log and calling out.println(text) appends the text at this end of this file. As I am not sure if it is thread-safe, all parallel threads writing at the same time into the same file might result in data loss.

    Note: if your bolts are distributed accros multiple machines, each machine is going to have its own log file. During my testings, I configured a simple cluster with 1 machine (running Nimbus + Supervisor + UI), therefore I had only 1 log file.

    Conclusion

    There are multiple ways to deal with output data and, more generally logging anything with Storm. I didn't find any official way of doing it and documentation very light on this subject.

    While some of us would be satisfied with a simple sysout.println(), others might need to push large quantity of data into specific files, or maybe in a specialized database engine. Anything you can do with Java is possible with Storm because it's simple Java programming.

    Any advices and additional comments to complete this answer will be gladly appreciated.