Search code examples
apache-sparkpysparkcassandraspark-cassandra-connector

Spark-Cassandra , How to get data based on Query


I have a Cassandra table which is quite huge and right now I have spark-Cassandra connection with the following code.

import pandas as pd
import numpy as np
from pyspark import *
import os
from pyspark.sql import SQLContext


os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages  com.datastax.spark:spark-cassandra-connector_2.12:3.0.1 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'
conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9042").setAppName("Sentinel").setMaster("spark://Local:7077")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table='movies', keyspace='movie_lens')\
        .load()\
        

The primary key is Movie_id which is an integer. the .load() loads the entire table into memory, this I want to avoid. One way that i got is to use filter

table_df = sqlContext.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table='movies', keyspace='movie_lens')\
        .load()\
        .filter("movie_id = 37032")

But does filter actually prevent loading entire table to memory? or does it first load and then filter. Also, I have to query for many ID's. lets say I need 1000 ID's and each day ID's keep changing . Then how to do it?


Solution

  • Yes, Spark Cassandra Connector will perform so-called "predicate pushdown" if you're doing the query on the partition key, and will load data only from specific query (the .load function will just load the metadata, actual data load will happen first time when you really need data to perform an action). There are well documented rules on when predicate pushdown happens in Spark Cassandra connector. You can also check this by running table_df.explain(), and look for PushedFilters part for filters marked with asteric *.

    If you need to lookup multiple IDs, then you can either use .isin filter, but it's really not recommended with Cassandra. It's better to create a dataframe with IDs, and perform so-called Direct Join with Cassandra dataframe (it's available since SCC 2.5 for dataframes, or earlier for RDDs). I have a lengthy blog post on the joining with data in Cassandra