Search code examples
javanosqlcassandraapache-stormtrident

Trident Storm-Cassandra, writing to a table with multiple primary keys


I'm learning how to use Storm's Trident with Cassandra 2.0.5, Storm version 0.9.0.1. I'm also using com.hmsonline storm-cassandra 0.4.0-rc4 contrib.

My goal is simply to insert some text rows to a table with id (int), name (text) and a sentence (text) columns. id and name are primary keys.

The partitionPersist requires a StateUpdater, and for that I'm using com.hmsonline.storm.cassandra.trident.CassandraUpdater<K, C, V>. But from what it seems it gets only one key as input and not two (I need id and name). The tuple mapper (TridentTupleMapper) also uses one key:

TridentTupleMapper<K, C, V> tupleMapper

Maybe I'm missing something but how do I define multiple columns as keys?


Solution

  • Let me point you to the project that Brian and I have been working on that utilizes cassandra with storm: https://github.com/hmsonline/storm-cassandra-cql

    There are several examples you can look at to see how to develop a CqlTupleMapper that fits your key/column mapping. The code is still being developed but there is a suitable backing map implementation for CQL3 that works for persisting aggregations as well as just storing partition persists.

    For your needs, you would want to define a trident topology that groups your incoming data (sentences) by the method:

    inputStream.groupBy(new Field("sentences"))
    

    You would then implement a CqlTupleMapper -- specifically the map(K key, V value) that would have a custom CQL insert statement that maps the keys to its passed value. Your query would be something like:

    @Override
    public Statement map(List<String> keys, String value) {
        Insert statement = QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME);
        statement.value("id", keys.get(0));
        statement.value("name", keys.get(1));
        statement.value("sentence", value);
        return statement;
    }
    

    I hope that helps.