Search code examples
cassandraspark-streamingspark-jobserver

FiloDB + Spark Streaming Data Loss


I'm using FiloDB 0.4 with Cassandra 2.2.5 column and meta store and trying to insert data into it using Spark Streaming 1.6.1 + Jobserver 0.6.2. I use the following code to insert data:

messages.foreachRDD(parseAndSaveToFiloDb)

private static Function<JavaPairRDD<String, String>, Void> parseAndSaveToFiloDb = initialRdd -> {
        final List<RowWithSchema> parsedMessages = parseMessages(initialRdd.collect());
        final JavaRDD<Row> rdd = javaSparkContext.parallelize(createRows(parsedMessages));
        final DataFrame dataFrame = sqlContext.createDataFrame(rdd, generateSchema(rawMessages);

        dataFrame.write().format("filodb.spark")
                .option("database", keyspace)
                .option("dataset", dataset)
                .option("row_keys", rowKeys)
                .option("partition_keys", partitionKeys)
                .option("segment_key", segmentKey)
                .mode(saveMode).save();
        return null;
    };

Segment key is ":string /0", row key is set to column which is unique for each row and partition key is set to column which is const for all rows. In other words all my test data set goes to single segment on single partition. When I'm using single one-node Spark then everything works fine and I get all data inserted but when I'm running two separate one-node Sparks(not as a cluster) at the same time then I get lost about 30-60% of data even if I send messages one by one with several seconds as interval. I checked that dataFrame.write() is executed for each message so the issue happens after this line. When I'm setting segment key to column which is unique for each row then all data reaches Cassandra/FiloDB.

Please suggest me solutions for scenario with 2 separate sparks.


Solution

  • @psyduck, this is most likely because data for each partition can only be ingested on one node at a time -- for the 0.4 version. So to stick with the current version, you would need to partition your data into multiple partitions and then ensure each worker only gets one partition. The easiest way to achieve the above is to sort your data by partition key.

    I would highly encourage you to move to the latest version though - master (Spark 2.x / Scala 2.11) or spark1.6 branch (spark 1.6 / Scala 2.10). The latest version has many changes that are not in 0.4 that would solve your problem:

    • Using Akka Cluster to automatically route your data to the right ingestion node. In this case with the same model your data would all go to the right node and ensure no data loss
    • TimeUUID-based chunkID, so even in case multiple workers (in case of a split brain) somehow write to the same partition, data loss is avoided
    • A new "segment less" data model so you don't need to define any segment keys, more efficient for both reads and writes

    Feel free to reach out on our mailing list, https://groups.google.com/forum/#!forum/filodb-discuss