Search code examples
apache-sparkcassandraspark-streamingspark-cassandra-connectordstream

merge spark dStream with variable to saveToCassandra()


I have a DStream[String, Int] with pairs of word counts, e.g. ("hello" -> 10). I want to write these counts to cassandra with a step index. The index is initialized as var step = 1 and is incremented with each microbatch processed.

The cassandra table created as:

CREATE TABLE wordcounts (
    step int,
    word text,
    count int,
primary key (step, word)
);

When trying to write the stream to the table...

stream.saveToCassandra("keyspace", "wordcounts", SomeColumns("word", "count"))

... I get java.lang.IllegalArgumentException: Some primary key columns are missing in RDD or have not been selected: step.

How can I prepend the step index to the stream in order to write the three columns together?

I'm using spark 2.0.0, scala 2.11.8, cassandra 3.4.0 and spark-cassandra-connector 2.0.0-M3.


Solution

  • As noted, while the Cassandra table expects something of the form (Int, String, Int), the wordCount DStream is of type DStream[(String, Int)], so for the call to saveToCassandra(...) to work, we need a DStream of type DStream[(Int, String, Int)].

    The tricky part in this question is how to bring a local counter, that is by definition only known in the driver, up to the level of the DStream.

    To do that, we need to do two things: "lift" the counter to a distributed level (in Spark, we mean "RDD" or "DataFrame") and join that value with the existing DStream data.

    Departing from the classic Streaming word count example:

    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    

    We add a local var to hold the count of the microbatches:

    @transient var batchCount = 0
    

    It's declared transient, so that Spark doesn't try to close over its value when we declare transformations that use it.

    Now the tricky bit: Within the context of a DStream transformation, we make an RDD out of that single variable and join it with underlying RDD of the DStream using cartesian product:

    val batchWordCounts = wordCounts.transform{ rdd => 
      batchCount = batchCount + 1
    
      val localCount = sparkContext.parallelize(Seq(batchCount))
      rdd.cartesian(localCount).map{case ((word, count), batch) => (batch, word, count)}
    }
    

    (Note that a simple map function would not work, as only the initial value of the variable would be captured and serialized. Therefore, it would look like the counter never increased when looking at the DStream data.

    Finally, now that the data is in the right shape, save it to Cassandra:

    batchWordCounts.saveToCassandra("keyspace", "wordcounts")