Search code examples
apache-sparkapache-kafkacassandraspark-structured-streamingspark-cassandra-connector

Writing Spark streaming PySpark dataframe to Cassandra overwrites table instead of appending


I'm running a 1-node cluster of Kafka, Spark and Cassandra. All locally on the same machine.

From a simple Python script I'm streaming some dummy data every 5 seconds into a Kafka topic. Then using Spark structured streaming, I'm reading this data stream (one row at a time) into a PySpark DataFrame with startingOffset = latest. Finally, I'm trying to append this row to an already existing Cassandra table.

I've been following (How to write streaming Dataset to Cassandra?) and (Cassandra Sink for PySpark Structured Streaming from Kafka topic).

One row of data is being successfully written into the Cassandra table but my problem is it's being overwritten every time rather than appended to the end of the table. What might I be doing wrong?

Here's my code:

CQL DDL for creating kafkaspark keyspace followed by randintstream table in Cassandra:

DESCRIBE keyspaces;

CREATE KEYSPACE kafkaspark
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };
  
USE kafkaspark; 

CREATE TABLE randIntStream (
    key int,
    value int,
    topic text,
    partition int,
    offset bigint,
    timestamp timestamp,
    timestampType int,
    PRIMARY KEY (partition, topic)
);

Launch PySpark shell

./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

Read latest message from Kafka topic into streaming DataFrame:

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()

Some transformations and checking schema:

df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()

Function for writing to Cassandra:

def writeToCassandra(writeDF, epochId):
    writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="randintstream", keyspace="kafkaspark") \
    .mode("append") \
    .save()

Finally, query to write to Cassandra from Spark:

query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()

SELECT * on table in Cassandra: enter image description here


Solution

  • If the row is always rewritten in Cassandra, then you may have incorrect primary key in the table - you need to make sure that every row will have an unique primary key. If you're creating Cassandra table from Spark, then by default it just takes first column as partition key, and it alone may not be unique.

    Update after schema was provided:

    Yes, that's the case that I was referring - you have a primary key of (partition, topic), but every row from specific partition that you read from that topic will have the same value for primary key, so it will overwrite previous versions. You need to make your primary key unique - for example, add the offset or timestamp columns to the primary key (although timestamp may not be unique if you have data produced inside the same millisecond).

    P.S. Also, in connector 3.0.0 you don't need foreachBatch:

    df4.writeStream \
      .trigger(processingTime="5 seconds") \
      .format("org.apache.spark.sql.cassandra") \
      .options(table="randintstream", keyspace="kafkaspark") \
      .mode("update") \
      .start()
    

    P.P.S if you just want to move data from Kafka into Cassandra, you may consider the use of the DataStax's Kafka Connector that could be much lightweight compared to the Spark.