Search code examples
apache-sparkcassandraapache-zeppelinspark-cassandra-connector

Spark Cassandra Connector not adding all records to DB


I am using version #: com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3

I have a RDD from a kafka stream:

kafkaStream.foreachRDD((rdd: RDD[String]) => {
  if(rdd.count > 0) {
    println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages.");

    sqlContext.read.json(rdd)
                .select("count_metadata.tran_id")
                .write
                .format("org.apache.spark.sql.cassandra")
                .options(Map("table" -> "tmp", "keyspace" -> "kspace"))
                .mode(SaveMode.Append)
                .save();
  } else {
      println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed.");
  }    
});

The RDD count is around 40K but the spark connector only populates the database with a consistent 457 records.

sqlContext.read.json(rdd).select("count_metadata.tran_id").count

also prints 40k records.

Here is my table statement:

cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY);

The tran_id is unique for each message.

What am I missing? Why aren't all 40k records making it to that table?

My logs are not showing any exceptions either.


Solution

  • The tran_id is unique for each message.

    I lied:

    println(df.distinct.count);
    

    prints....

    457
    

    Time to bring it up to our upstream source.