Search code examples
apache-sparkpysparkcassandraspark-structured-streamingspark-cassandra-connector

How to deal with pySpark structured streaming coming from Kafka to Cassandra


I'm using pyspark to get data from Kafka and inserting it into cassandra. I'm almost there i just need the final step.

def Spark_Kafka_Receiver():

# STEP 1 OK!

    dc = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "000.00.0.240:9092") \
        .option("subscribe", "MyTopic") \
    .load()
    dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")

# STEP 2 OK!

    dc.writeStream \
        .outputMode("append") \
        .foreachBatch(foreach_batch_function) \
        .start() \
        .awaitTermination()

# STEP 3 NEED HELP

def foreach_batch_function(df, epoch_id):
    Value = df.select(df.value)

    ???????

    # WRITE DATA FRAME ON CASSANDRA
    df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options(table=table_name, keyspace=keyspace) \
        .save()

So i have my Value that is in this format:

DataFrame[value: binary]

i would need to insert something that open my Value take the binary inside and create a nice dataframe with the correct format that mach the database and with it execute the last part of my code.


Solution

  • You don't need to use foreachBatch anymore. You just need to upgrade to Spark Cassandra Connector 2.5 that natively supports Spark Structured Streaming, so you can just write:

    dc.writeStream \
            .format("org.apache.spark.sql.cassandra") \
            .mode('append') \
            .options(table=table_name, keyspace=keyspace)
            .start() \
            .awaitTermination()
    

    Regarding the second part of your question - if you want to convert your value into a multiple columns, you need to use from_json function, passing the schema to it. Here is example in Scala, but Python code should be quite similar:

    val schemaStr = "id:int, value:string"
    val schema = StructType.fromDDL(schemaStr)
    val data = dc.selectExpr("CAST(value AS STRING)")
      .select(from_json($"value", schema).as("data"))
      .select("data.*").drop("data")
    

    and then you can write that data via writeStream