Search code examples
hashredisapache-stormtrident

How to store Redis hash key using Trident


I am working on a real-time data project and currently using the trident-redis library at https://github.com/kstyrc/trident-redis to store sets of keys with counts. I have some more advanced breakdowns I'd like to store including a latitude and longitude value for each key. With Redis on the command line I can use:

HSET 123 lat "40"
HSET 123 lon "-37"

to get

1) "lat"
2) "40"
3) "lon"
4) "-37"

with

HGETALL 123

How can I achieve this same effect using trident-redis? My topology currently looks like this:

public class TridentEventTopology {

    public static final StormTopology buildTopology(LocalDRPC drpc, StateFactory state) throws IOException {

        final int batchSize = 500;
        final BatchSpout spout = new BatchSpout(batchSize);

        final TridentTopology topology = new TridentTopology();
        TridentState batchedCounts = topology.newStream("spout", spout)
                                               .groupBy(new Fields("id"))
                                               .persistentAggregate(state, new Count(), new Fields("count"));

        topology.newDRPCStream("stream", drpc)
                .groupBy(new Fields("args"))
                .stateQuery(batchedCounts, new Fields("args"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        return topology.build();
    }

    public static final void executeTopology() throws IOException {

        final StateFactory redis = RedisState.nonTransactional(new InetSocketAddress("localhost", 6379));
        final Config conf = new Config();
        final LocalDRPC drpc = new LocalDRPC();
        final LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("test", conf, buildTopology(drpc, redis));
    }
}

Solution

  • I have met the same situation. I think you can modify the multiput and multiget function of RedisState in which you can hget or hput your desired keys and fields. Actually it's already implemented for a fixed hash-key. so all you need to do is extracting fields from the multiget or multiput key params.