Search code examples
apache-sparkcassandracassandra-3.0spark-cassandra-connector

Spark Full Rdd joinWithCassandraTable java.lang.IllegalArgumentException: requirement failed: Invalid row size: instead of


  • I am currently trying to join a spark dataframe to a cassandra table.
  • We unfortunately cannot upgrade immediately to the new Datastax connector 2.5.0 and use Direct Joins
  • So i am trying the Rdd approach using the existing joinWithCassandraTable

Here's My Sample code

# Cassandra Table Definition 

custId: text PRIMARY KEY
custName: text
custAddress: text

val testDF = Seq(("event-01", "cust-01"), ("event-02", "cust-02")).toDF(("eventId", "custId"))

val resultRdd = testDF
    .rdd
    .leftJoinWithCassandraTable(
      keyspaceName = "my_key_space",
      tableName = "cust_table",
      selectedColumns = AllColumns,
      joinColumns = SomeColumns("custId")
    )
    .map { case (sparkRow, cassandraRow) =>
      val resultStruct = cassandraRow
        .map(r => Row.fromSeq(r.columnValues))
        .orNull
      Row.fromSeq(sparkRow.toSeq :+ resultStruct)
    }
  • This throws a java.lang.IllegalArgumentException: requirement failed: Invalid row size: 2 instead of 1
  • If i restrict the testDF to just have the custId column, then this works fine.
  • Am i making a mistake somewhere. How can i perform the join on the Full Rdd instead of a projection with just the key column

Solution

  • You need to use .on(SomeColumns("custId")) right after leftJoinWithCassandraTable...

    I have the blog post on the efficient join with Cadsandra, and it describes RDD API as well...