Search code examples
apache-stormtridentstream-processing

Storm multi-fields grouping


What I suppose to do is group the stream by two fields ("remote-client-ip", "request-params"), and count the number of tuples in each group. And combine them into a map. Here is my topology:

topology.newStream("kafka-spout-stream-1", repeatSpout)
                    .each(new Fields("str"), new URLParser(), new   Fields(fieldNames))
                    .each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
                    .groupBy(new Fields("remote-client-ip", "query-string"))
                    .aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
                    .groupBy(new Fields("remote-client-ip"))
                    .persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));

But after debugging, I found data stream blocked at first groupBy(), which is a multi-fields grouping. I got nothing executed for Count() in the subsequent aggregate statement.

So I think I misunderstand some concept about the interaction between multi-fields grouping and aggregation.

Please let me know whether my speculation is right or wrong. Thank you!


Solution

  • You are grouping already grouped fields with the Aggregate() function in your topology. Try this:

    .aggregate(new Count(), new Fields("user-word-count"))
    

    Instead of this:

    .aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))