Search code examples
scalaapache-sparkcassandraspark-cassandra-connector

Pass columnNames dynamically to cassandraTable().select()


I'm reading query off of a file at run-time and executing it on the SPark+Cassandra environment.

I'm executing :
sparkContext.cassandraTable.("keyspaceName", "colFamilyName").select("col1", "col2", "col3").where("some condition = true")

Query in FIle :

select col1, col2, col3 from keyspaceName.colFamilyName where somecondition = true

Here Col1,col2,col3 can vary depending on the query parsed from the file.

Question :
How do I pick columnName from query and pass them to select() and runtime.

I have tried many ways to do it :
1. dumbest thing done (which obviously threw an error) -

var str = "col1,col2,col3"
var selectStmt = str.split("\\,").map { x => "\"" + x.trim() + "\"" }.mkString(",")
var queryRDD = sc.cassandraTable().select(selectStmt)

Any ideas are welcome.

Side Notes :
1. I do not want to use cassandraCntext becasue it will be depricated/ removed in next realase (https://docs.datastax.com/en/datastax_enterprise/4.5/datastax_enterprise/spark/sparkCCcontext.html)
2. I'm on
- a. Scala 2.11
- b. spark-cassandra-connector_2.11:1.6.0-M1
- c. Spark 1.6


Solution

  • Use Cassandra Connector

    Your use case sounds like you actually want to use CassandraConnector Objects. These give you a direct access to a per ExecutorJVM session pool and are ideal for just executing random queries. This will end up being much more efficient than creating an RDD for each query.

    This would look something like

    rddOfStatements.mapPartitions( it => 
      CassandraConnector.withSessionDo { session => 
        it.map(statement => 
          session.execute(statement))})
    

    But you most likely would want to use executeAsync and handle the futures separately for better performance.

    Programatically specifying columns in cassandraTable

    The select method takes ColumnRef* which means you need to pass in some number of ColumnRefs. Normally there is an implicit conversion from String --> ColumnRef which is why you can pass in just a var-args of strings.

    Here it's a little more complicated because we want to pass var args of another type so we end up with double implicits and Scala doesn't like that.

    So instead we pass in ColumnName objects as varargs (:_*)

    ========================================
     Keyspace: test
    ========================================
     Table: dummy
    ----------------------------------------
     - id                      : java.util.UUID                                                                   (partition key column)
     - txt                     : String
    
    
    val columns = Seq("id", "txt")
    columns: Seq[String] = List(id, txt)
    
    //Convert the strings to ColumnNames (a subclass of ColumnRef) and treat as var args
    sc.cassandraTable("test","dummy")
      .select(columns.map(ColumnName(_)):_*)
      .collect      
    
    Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693, txt: hello world})
    
    //Only use the first column
    sc.cassandraTable("test","dummy")
      .select(columns.map(ColumnName(_)).take(1):_*)
      .collect
    
    Array(CassandraRow{id: 74f25101-75a0-48cd-87d6-64cb381c8693})
    
    //Only use the last column        
    sc.cassandraTable("test","dummy")
      .select(columns.map(ColumnName(_)).takeRight(1):_*)
      .collect
    
    Array(CassandraRow{txt: hello world})