Search code examples
scalaapache-sparkcassandraspark-cassandra-connector

how to split a list to multiple partitions and sent to executors


When we use spark to read data from csv for DB as follow, it will automatically split the data to multiple partitions and sent to executors

spark
  .read
  .option("delimiter", ",")
  .option("header", "true")
  .option("mergeSchema", "true")
  .option("codec", properties.getProperty("sparkCodeC"))
  .format(properties.getProperty("fileFormat"))
  .load(inputFile)

Currently, I have a id list as :

[1,2,3,4,5,6,7,8,9,...1000]

What I want to do is split this list to multiple partitions and sent to executors, in each executor, run the sql as

ids.foreach(id => {    
select * from table where id = id
})

When we load data from cassandra, the connector will generate the query sql as:

select columns from table where Token(k) >= ? and Token(k) <= ? 

it means, the connector will scan the whole database, virtually, I needn't to scan the whole table, I just what to get all the data from the table where the k(partition key) in the id list.

the table schema as:

CREATE TABLE IF NOT EXISTS tab.events (
    k int,
    o text,
    event text
    PRIMARY KEY (k,o)
);

or how can i use spark to load data from cassandra using pre defined sql statement without scan the whole table?


Solution

  • You simply need to use joinWithCassandra function to perform selection only of the data is required for your operation. But be aware that this function is only available via RDD API.

    Something like this:

    val joinWithRDD = your_df.rdd.joinWithCassandraTable("tab","events")
    

    You need to make sure that column name in your DataFrame matched the partition key name in Cassandra - see documentation for more information.

    The DataFrame implementation is only available in the DSE version of Spark Cassandra Connector as described in following blog post.

    Update in September 2020th: support for join with Cassandra was added in the Spark Cassandra Connector 2.5.0