Search code examples
apache-sparksliding-windowspark-streaming

Can you cascade sliding time windows in sparks streaming


I'm wondering if it's possible to cascade sliding windows into one another with Sparks Streaming.

So for example I have counts coming in every 1 second. I want to sum those for windows of 5, 15 and 30 seconds. I'm wondering if it's possible to reuse the 5 second windows result for the 15 seconds one, and the 15 seconds one for the 30 seconds.

The aim is to avoid storing the 1 second update for all the inputs, for the length of the longest window (since the granularity does not matter here). Instead we reuse Dstream with a frequency that matches the one we need.

Here's and example:

    JavaPairDStream< String, Double >  test = input; 
    JavaPairDStream< String, Double >  test1 = input;
    // 5s:
    test = test.reduceByKeyAndWindow(new SumReducer(), new Duration(5000), new Duration(1000));  
    test1 = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(5000), new Duration(5000));
    // 15s
    test = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(15000), new Duration(5000));
    test1 = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(15000), new Duration(15000));
    // 30s
    test = test1.reduceByKeyAndWindow(new SumReducer(), new Duration(30000), new Duration(15000));
    test.print();

I tried that but nothing gets printed.


Solution

  • Batch interval

    Window length and sliding interval must be a multiply of the batch interval. To avoid race conditions (for example emitting three 5 second sums in a 10 seconds window) the batch interval must be greater than the calculation time. I'll assume a batch interval of 1000ms here.

    Example

    JavaPairDStream<String, Double> stream = input; 
    
    // A: 5s sum every 5s
    stream5sCount = stream.reduceByKeyAndWindow(
        new SumReducer(), new Duration(5000), new Duration(5000));  
    
    // B: 15s sum every 5s
    stream15sCount = stream5sCount.reduceByKeyAndWindow(
        new SumReducer(), new Duration(15000), new Duration(5000));
    
    // C: 30s sum every 15s
    stream30sCount = stream15sCount
        .reduceByKeyAndWindow(new SumReducer(), new Duration(30000), new Duration(15000))
        .map(new DivideBy(3));
    
    stream30sCount.print();
    

    Explanation

    (For two Actions A and B, where B reduces A: windowLength of B / slideInterval of A = number of input tuples for B.)

    1. Every 5 seconds A sums up 5 tuples.
    2. Every 5 seconds B sums up A's last (15/5=) 3 results based on (3*5=) 15 original tuples.
    3. Every 30 seconds C sums up B's last (30/5=) 6 results based on (6*3*5=) 90 original tuples! Tuples would be summed multiple times, because B's window interval is bigger than its sliding interval.
    4. A mapper corrects the calculation error.

    Correction Step

    I assume your real application isn't as easy as a word count. You'll need an inverse function to fix the duplication error afterwards. You could also try to fix the problem before C (in the word count example it's possible to divide earlier). Another solution would be to keep track of already processed tuples and only aggregate disjunctive tuples. It depends on your use case.