I am extracting data from Cassandra (3.10) to Spark2 using the spark-cassandra-connector (latest version, i think 2.0.3). I want to know if there is a way to extract from multiple primary keys in one query. The PKs are atomic (only one field defines the key). But they are hashes so I cannot do range queries. Is there a way to extract
val set = Set(hash1,hash2,hash3)
in one call sc.cassandraTable("keyspace","table").where("id =?",set)
or something to this effect.
I am using Scala with Spark.
Edit:
One suggestion was to use the following approach
sc.cassandraTable("keyspace","table").where("id in ?",set)
The problem with this approach is that when I try to use the data extracted this way I get the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8463.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8463.0 (TID 25187, worker2, executor 28): java.io.IOException: Exception during preparation of SELECT "sha256", "id", "label", "label_version", "data" FROM "keyspace"."data" WHERE sha256 in ? ALLOW FILTERING: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$34.applyOrElse(TypeConverter.scala:671)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:659)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$35.applyOrElse(TypeConverter.scala:875)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:858)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:54)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:858)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:316)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:315)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:315)
... 29 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
... 60 elided
Caused by: java.io.IOException: Exception during preparation of SELECT "sha256", "id", "label", "label_version", "data" FROM "keyspace"."data" WHERE sha256 in ? ALLOW FILTERING: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayB
uffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
... 3 more
Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object [Ljava.lang.String;@2e5a7dfc of type class [Ljava.lang.String; to java.util.ArrayList[AnyRef].
at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:43)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$34.applyOrElse(TypeConverter.scala:671)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:659)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter$$anonfun$convertPF$35.applyOrElse(TypeConverter.scala:875)
at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:41)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.com$datastax$spark$connector$types$NullableTypeConverter$$super$convert(TypeConverter.scala:858)
at com.datastax.spark.connector.types.NullableTypeConverter$class.convert(TypeConverter.scala:54)
at com.datastax.spark.connector.types.TypeConverter$OptionToNullConverter.convert(TypeConverter.scala:858)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:316)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:315)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:683)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:682)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:315)
... 29 more
So for a partition/primary key this command is problematic because it requires ALLOW FILTERING
although doesnt the connector supply that part at the end of most CQL statements? Even so, I have seen examples of this kind of command working for Partition Keys (in CQL not the connector). So is this something the connector does not support (yet) or a deliberate design? Any ideas how to fix or how to perform my required query in a different way?
sc.cassandraTable("keyspace","table").where("id in ?",set)
is the correct approach but the set needs to be an iterable, List or Seq, not an array (which is what I had and raised the error).