Search code examples
javastreamapache-stormcomplex-event-processingstream-processing

Why does this explicit definition of a storm stream not work, while the implicit one does?


Given a simple Apache Storm Topology that makes use of the Stream API, there are two ways of initializing an Stream:


Version 1 - implicit declaration

    StreamBuilder builder = new StreamBuilder();
    builder
        .newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
        .filter(x -> x > 5)
        .print();

Result: This worked as expected, it only prints integers > 5.


Version 2 - explicit declaration

    Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
    integerStream.filter(x -> x > 5);
    integerStream.print();

Result: This did not work - all tuples were printed, including integers < 5.


Question: Why does this explicit declaration not work properly and how to fix that?


The topologies were ran on an local cluster where IntSpout is just an simple spout that emits random integers with the following commands:

    StormTopology topo = builder.build();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", new HashMap<>(), topo);

Solution

  • That's because integerStream.filter(x -> x > 5); returns a new stream that you ignore.

    This works:

        Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
        Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
        filteredStream.print();
    

    There is also a syntax error in your first example. It has an extra semicolon at the end of the fourth line.

    StreamBuilder builder = new StreamBuilder();
    builder
        .newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
        .filter(x -> x > 5) // <= there was a semicolon here 
        .print();