Search code examples
javaapache-sparkcassandraspark-cassandra-connectorscylla

Spark writing to Cassandra with varying TTL


In Java Spark, I have a dataframe that has a 'bucket_timestamp' column, which represents the time of the bucket that the row belongs to.

I want to write the dataframe to a Cassandra DB. The data must be written to the DB with TTL. The TTL should be depended on the bucket timestamp - where each row's TTL should be calculated as ROW_TTL = CONST_TTL - (CurrentTime - bucket_timestamp), where CONST_TTL is a constant TTL that I configured.

Currently I am writing to Cassandra with spark using a constant TTL, with the following code:

df.write().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "key_space_name");
                    put("table, "table_name");
                    put("spark.cassandra.output.ttl, Long.toString(CONST_TTL)); // Should be depended on bucket_timestamp column
                }
            }).mode(SaveMode.Overwrite).save();

One possible way I thought about is - for each possible bucket_timestamp - filter the data according to timestamp, calculate the TTL and write filtered data to Cassandra. but this seems very non-efficient and not the spark way. Is there a way in Java Spark to provide a spark column as the TTL option, so that the TTL will differ for each row?

Solution should be working with Java and dataset< Row>: I encountered some solutions for performing this with RDD in scala, but didn't find a solution for using Java and dataframe.

Thanks!


Solution

  • Update: Support for this functionality in the DataFrame API has been available since Spark Cassandra Connector 3.0 which was released in May 2020.

    Old answer:

    For DataFrame API there is no support for such functionality, yet... There is JIRA for it - https://datastax-oss.atlassian.net/browse/SPARKC-416, you can watch it to get notified when it's implemented...

    So the only choice that you have is to use RDD API as described in the @bartosz25's answer...