Search code examples
scalaapache-sparkcassandraapache-spark-sqlcase-class

Create nested case class instance from a DataFrame


I have this two case classes:

case class Inline_response_200(
  nodeid: Option[String],
  data: Option[List[ReadingsByEpoch_data]]
)

and

case class ReadingsByEpoch_data(
  timestamp: Option[Int],
  value: Option[String]
)

And I have a Cassandra table that has data like nodeid|timestamp|value. Basically, each nodeid has multiple timestamp-value pairs.

All I want to do is create instances of Inline_response_200 with their proper List of ReadingsByEpoch_data so Jackson can serialize them properly to Json.

I've tried

val res = sc.cassandraTable[Inline_response_200]("test", "taghistory").limit(100).collect()

But I get this error

java.lang.IllegalArgumentException: Failed to map constructor parameter data in com.wordnik.client.model.Inline_response_200 to a column of test.taghistory

Makes total sense because there is no column data in my Cassandra table. But then how can I create the instances correctly?

Cassandra table looks like this:

CREATE TABLE test.taghistory (
nodeid text,
timestamp text,
value text,
PRIMARY KEY (nodeid, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)

EDIT
As per Alex Ott's suggestion:

val grouped = data.groupByKey.map {
  case (k, v) =>
    Inline_response_200(k.getString(0), v.map(x => ReadingsByEpoch_data(x.getInt(1), x.getString(2))).toList)
}
grouped.collect().toList

I'm close but not there yet. This gives me the format I expect, however its creating one instance of Inline_response_200 per record:

[{"nodeid":"Tag3","data":[{"timestamp":1519411780,"value":"80.0"}]},{"nodeid":"Tag3","data":[{"timestamp":1519411776,"value":"76.0"}]}]  

In this example I need to have one nodeid key, and an array of two timestamp-value pairs, like this:

[{"nodeid":"Tag3","data":[{"timestamp":1519411780,"value":"80.0"},{"timestamp":1519411776,"value":"76.0"}]}]`  

Maybe I'm grouping the wrong way?


Solution

  • If you have data like nodeid|timestamp|value in your DB (yes, according to schema), you can't directly map it into structure that you created. Read data from table as pair RDD:

    val data = sc.cassandraTable[(String,String,Option[String])]("test", "taghistory")
         .select("nodeid","timestamp","value").keyBy[String]("nodeid")
    

    and then transform it into structure that you need by using groupByKey on that pair RDD & transforming into Inline_response_200 class that you need, like this:

    val grouped = data.groupByKey.map{case (k,v) => Inline_response_200(k,
           v.map(x => ReadingsByEpoch_data(x._2, x._3)).toList)}
    grouped.collect