Search code examples
javaunit-testingapache-stormtrident

Running Trident Topology in Storm TrackedTopology Unit Test


How can I run a JUnit test of a Trident Topology to allow tuples to flow through the topology while testing and verifying the output at each stage? I've tried running within Storm's Testing framework, but it's falling short of allowing verification and consistent execution of Trident.

Here's an example topology with some in-line comments where I'm having the most issues.

import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.MemoryMapState;
import storm.trident.testing.Split;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.Testing;
import backtype.storm.testing.FeederSpout;
import backtype.storm.testing.TestJob;
import backtype.storm.testing.TrackedTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

public class WordCountTopologyTest {

    @Test
    public void testWordCountTopology() throws Exception {
        Testing.withTrackedCluster(new WordCountTestJob());
    }

    public class WordCountTestJob implements TestJob {

        @Override
        public void run(ILocalCluster cluster) throws Exception {

            // Create the test topology to submit
            TridentTopology termCountTopology = new TridentTopology();

            FeederSpout feeder = new FeederSpout(new Fields("text", "author"));

            TridentState tridentState = termCountTopology.newStream("inputSpout", feeder)
                    .each(new Fields("text"), new Split(), new Fields("word"))
                   .groupBy(new Fields("word"))
                   .name("counter-output")
                   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
                   .parallelismHint(6);

            TrackedTopology tracked = Testing.mkTrackedTopology(cluster, termCountTopology.build());

            // Feed some random data into the topology
            feeder.feed(Arrays.asList("Nearly all men can stand adversity, but if you want to test a man's character, give him power.", "Abraham Lincoln"));
            feeder.feed(Arrays.asList("No man has a good enough memory to be a successful liar.", "Abraham Lincoln"));
            feeder.feed(Arrays.asList("Either write something worth reading or do something worth writing.", "Benjamin Franklin"));
            feeder.feed(Arrays.asList("Well done is better than well said.", "Benjamin Franklin"));

            cluster.submitTopology("word-count-testing", new Config(), tracked.getTopology());

            // (!!) Runs, but bad to sleep for any time when may run faster or slower on other systems
            // Utils.sleep(5000);

            // (!!) Fails with 5000ms Topology timeout
            // Testing.trackedWait(tracked, 3);

            /*
             * (!!) Always 0. Trident creates the streams and bolts internally with
             * different names, so how can we read them to verify?
             */
            List outputTuples = Testing.readTuples(tracked, "counter-output");
            assertEquals(0, outputTuples.size());
        }
    }
}

Beyond this, I've tried writing my own BaseFilter to tag on to the end of the stream that stores all of the tuples, but it seems like there must be a better way. Also, that doesn't solve the issue of running the topology in a controlled manner. Is this something that Trident supports?


Solution

  • Use the class FeederBatchSpout (for Trident) instead of FeederSpout. FeederBatchSpout blocks by itself, there is no need to use Testing.trackedWait() or anything like that.

    Source: https://groups.google.com/forum/#!topic/storm-user/CrAdQEXo5OU