I'm using pyspark to get data from Kafka and inserting it into cassandra. I'm almost there i just need the final step.
def Spark_Kafka_Receiver():
# STEP 1 OK!
dc = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "000.00.0.240:9092") \
.option("subscribe", "MyTopic") \
.load()
dc.selectExpr("CAST(key as STRING)", "CAST(value AS STRING) as msg")
# STEP 2 OK!
dc.writeStream \
.outputMode("append") \
.foreachBatch(foreach_batch_function) \
.start() \
.awaitTermination()
# STEP 3 NEED HELP
def foreach_batch_function(df, epoch_id):
Value = df.select(df.value)
???????
# WRITE DATA FRAME ON CASSANDRA
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace) \
.save()
So i have my Value that is in this format:
DataFrame[value: binary]
i would need to insert something that open my Value take the binary inside and create a nice dataframe with the correct format that mach the database and with it execute the last part of my code.
You don't need to use foreachBatch
anymore. You just need to upgrade to Spark Cassandra Connector 2.5 that natively supports Spark Structured Streaming, so you can just write:
dc.writeStream \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table=table_name, keyspace=keyspace)
.start() \
.awaitTermination()
Regarding the second part of your question - if you want to convert your value into a multiple columns, you need to use from_json
function, passing the schema to it. Here is example in Scala, but Python code should be quite similar:
val schemaStr = "id:int, value:string"
val schema = StructType.fromDDL(schemaStr)
val data = dc.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema).as("data"))
.select("data.*").drop("data")
and then you can write that data via writeStream