Search code examples
rubyjrubyapache-storm

implement storm tick tuple in Redstorm DSL


### TOPOLOGY ###
class WordCountTopology < DSL::Topology
  spout RandomSentenceSpout, :parallelism => 2

  bolt SplitSentenceBolt, :parallelism => 2 do
    source RandomSentenceSpout, :shuffle
  end

  bolt WordCountBolt, :parallelism => 2 do
    source SplitSentenceBolt, :fields => ["word"]
  end

  configure :word_count do |env|
    debug true
    max_task_parallelism 4
    if env == :cluster
      num_workers 6
      max_spout_pending(1000)
    end
  end

  on_submit do |env|
    if env == :local
      sleep(60)
      cluster.shutdown
    end
  end
end



### SPOUT ###
class RandomSentenceSpout < DSL::Spout
  output_fields :word

  on_send {@sentences[rand(@sentences.length)]}
  sleep(10)

  on_init do
    @sentences = [
      "the cow jumped over the moon",
      "an apple a day keeps the doctor away",
      "four score and seven years ago",
      "snow white and the seven dwarfs",
      "i am at two with nature"
    ]
  end
end


####  SplitSentenceBolt ###
class SplitSentenceBolt < DSL::Bolt
  output_fields :word
  on_receive {|tuple| tuple[0].split(' ').map{|w| [w]}}
end




### WORDCOUNTBOLT ###
class WordCountBolt < DSL::Bolt
  output_fields :word, :count
  on_init {@counts = Hash.new{|h, k| h[k] = 0}}

  on_receive do |tuple|
    word = tuple[0].to_s
    @counts[word] += 1

    [word, @counts[word]]
  end
end

I want to implement a tick tuple. I want to do the word count every 60 seconds, emit the result , reset counter to 0.

// may be
// in split sentence bolt

function to send tick_tuple every 60 seconds

// in word count bolt
if(tick_tuple){
emit results
@counts = [] # re initialize
}

Can anyone help me implement this ? I am new to storm world.


Solution

  • You can use Storm internal tick tuple functionally to set a bolt to receive a tick tuple at a specified interval.

    In the bolt section of the topology definition simply add the following configuration:

    bolt MyBolt do
      ...
      set Backtype::Config::TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60
      ...
    end
    

    In your bolt, you can test for the tick tuple like this:

    if tuple.source_stream_id == "__tick"
      ...
    end