Search code examples
apache-sparkcassandraapache-spark-sqlspark-structured-streamingspark-cassandra-connector

How to store data from a dataframe in a variable to use as a parameter in a select in cassandra?


I have a Spark Structured Streaming application. The application receives data from kafka, and should use these values ​​as a parameter to process data from a cassandra database. My question is how do I use the data that is in the input dataframe (kafka), as "where" parameters in cassandra "select" without taking the error below:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

This is my df input:

 val df = spark
  .readStream
  .format("kafka")
  .options(
    Map("kafka.bootstrap.servers"-> kafka_bootstrap,
      "subscribe" -> kafka_topic,
      "startingOffsets"-> "latest",
      "fetchOffset.numRetries"-> "5",
      "kafka.group.id"-> groupId
    ))
  .load()

I get this error whenever I try to store the dataframe values ​​in a variable to use as a parameter.

This is the method I created to try to convert the data into variables. With that the spark give the error that I mentioned earlier:

def processData(messageToProcess: DataFrame): DataFrame = {

val messageDS: Dataset[Message] = messageToProcess.as[Message]

val listData: Array[Message] = messageDS.collect()

listData.foreach(x => println(x.country))

val mensagem = messageToProcess

mensagem

}


Solution

  • When you need to use data in Kafka to query data in Cassandra, then such operation is a typical join between two datasets - you don't need to call .collect to find entries, you just do the join. And it's quite typical thing - to enrich data in Kafka with data from the external dataset, and Cassandra provides low-latency operations.

    Your code could look as following (you'll need to configure so-called DirectJoin, see link below):

    import spark.implicits._
    import org.apache.spark.sql.cassandra._
    
    val df = spark.readStream.format("kafka")
      .options(Map(...)).load()
    ... decode data in Kafka into columns
    val cassdata = spark.read.cassandraFormat("table", "keyspace").load
    val joined = df.join(cassdata, cassdata("pk") === df("some_column"))
    val processed = ... process joined data
    
    val query = processed.writeStream.....output data somewhere...start()
    query.awaitTermination()
    

    I have detailed blog post on how to perform efficient joins with data in Cassandra.