What I suppose to do is group the stream by two fields ("remote-client-ip", "request-params"
), and count the number of tuples in each group. And combine them into a map. Here is my topology:
topology.newStream("kafka-spout-stream-1", repeatSpout)
.each(new Fields("str"), new URLParser(), new Fields(fieldNames))
.each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
.groupBy(new Fields("remote-client-ip", "query-string"))
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
.groupBy(new Fields("remote-client-ip"))
.persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));
But after debugging, I found data stream blocked at first groupBy()
, which is a multi-fields grouping. I got nothing executed for Count()
in the subsequent aggregate statement.
So I think I misunderstand some concept about the interaction between multi-fields grouping and aggregation.
Please let me know whether my speculation is right or wrong. Thank you!
You are grouping already grouped fields with the Aggregate()
function in your topology. Try this:
.aggregate(new Count(), new Fields("user-word-count"))
Instead of this:
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))