I'm learning how to use Storm's Trident with Cassandra 2.0.5, Storm version 0.9.0.1. I'm also using com.hmsonline storm-cassandra 0.4.0-rc4 contrib.
My goal is simply to insert some text rows to a table with id (int), name (text) and a sentence (text) columns. id and name are primary keys.
The partitionPersist
requires a StateUpdater
, and for that I'm using com.hmsonline.storm.cassandra.trident.CassandraUpdater<K, C, V>
.
But from what it seems it gets only one key as input and not two (I need id and name).
The tuple mapper (TridentTupleMapper
) also uses one key:
TridentTupleMapper<K, C, V> tupleMapper
Maybe I'm missing something but how do I define multiple columns as keys?
Let me point you to the project that Brian and I have been working on that utilizes cassandra with storm: https://github.com/hmsonline/storm-cassandra-cql
There are several examples you can look at to see how to develop a CqlTupleMapper that fits your key/column mapping. The code is still being developed but there is a suitable backing map implementation for CQL3 that works for persisting aggregations as well as just storing partition persists.
For your needs, you would want to define a trident topology that groups your incoming data (sentences) by the method:
inputStream.groupBy(new Field("sentences"))
You would then implement a CqlTupleMapper -- specifically the map(K key, V value) that would have a custom CQL insert statement that maps the keys to its passed value. Your query would be something like:
@Override
public Statement map(List<String> keys, String value) {
Insert statement = QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME);
statement.value("id", keys.get(0));
statement.value("name", keys.get(1));
statement.value("sentence", value);
return statement;
}
I hope that helps.