Search code examples
apache-sparkkryoaccumulo

Read from Accumulo with Spark Shell


I try to use the spark shell to connect to an Accumulo Table

I load spark and the libraries I need like this:

$ bin/spark-shell --jars /data/bigdata/installs/accumulo-1.7.2/lib/accumulo-fate.jar:/data/bigdata/installs/accumulo-1.7.2/lib/accumulo-core.jar:/data/bigdata/installs/accumulo-1.7.2/lib/accumulo-trace.jar:/data/bigdata/installs/accumulo-1.7.2/lib/htrace-core.jar:/data/bigdata/installs/accumulo-1.7.2/lib/libthrift.jar

To the shell, I paste

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.accumulo.core.client.mapred.{AbstractInputFormat, AccumuloInputFormat}
import org.apache.accumulo.core.client.security.tokens.PasswordToken

import org.apache.hadoop.conf.Configuration
import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.client.ClientConfiguration

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.accumulo.core.client.mapred.InputFormatBase


val user       = "root"
val tableName  = "hd_history"
val instanceName = "GISCIENCE"
val zooKeepers = "localhost:2181"
val token = new PasswordToken("***")

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//conf.registerKryoClasses(Array(classOf[org.apache.accumulo.core.data.Key],classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],classOf[org.apache.accumulo.core.data.Value],classOf[org.apache.spark.api.java.JavaSparkContext]))
val sc = new SparkContext(conf)

val jobConf = new JobConf() // Create a job conf

// Configure the job conf with accumulo properties
AbstractInputFormat.setConnectorInfo(jobConf, user, token)
AbstractInputFormat.setScanAuthorizations(jobConf, new Authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AbstractInputFormat.setZooKeeperInstance(jobConf, clientConfig)
InputFormatBase.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
  classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
  classOf[org.apache.accumulo.core.data.Key], 
  classOf[org.apache.accumulo.core.data.Value]
  ) 

When I try to rdd2.count()

I get

16/07/18 18:30:43 INFO spark.SparkContext: Starting job: count at <console>:38
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Got job 1 (count at <console>:38) with 1 output partitions
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (count at <console>:38)
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Missing parents: List()
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35), which has no missing parents
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 1776.0 B, free 148.9 KB)
16/07/18 18:30:43 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1110.0 B, free 150.0 KB)
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:39461 (size: 1110.0 B, free: 487.9 MB)
16/07/18 18:30:43 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (NewHadoopRDD[0] at newAPIHadoopRDD at <console>:35)
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/07/18 18:30:43 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2284 bytes)
16/07/18 18:30:43 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1)
16/07/18 18:30:43 INFO rdd.NewHadoopRDD: Input split: Range: (-inf,+inf) Locations: [localhost] Table: hd_history TableID: 8 InstanceName: GISCIENCE zooKeepers: localhost:2181 principal: root tokenSource: INLINE authenticationToken: org.apache.accumulo.core.client.security.tokens.PasswordToken@77db28e3 authenticationTokenFile: null Authorizations:  offlineScan: false mockInstance: false isolatedScan: false localIterators: false fetchColumns: [] iterators: [] logLevel: INFO
16/07/18 18:30:43 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 2082 bytes result sent to driver
16/07/18 18:30:43 ERROR scheduler.TaskResultGetter: Exception while getting task result
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
    at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
    at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/07/18 18:30:43 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
16/07/18 18:30:43 INFO scheduler.DAGScheduler: ResultStage 1 (count at <console>:38) failed in 0.029 s
16/07/18 18:30:43 INFO scheduler.DAGScheduler: Job 1 failed: count at <console>:38, took 0.040014 s
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB)
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 2
16/07/18 18:30:43 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on localhost:39461 in memory (size: 1110.0 B, free: 487.9 MB)
16/07/18 18:30:43 INFO spark.ContextCleaner: Cleaned accumulator 1
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
  ... 48 elided

It is not clear to me, what classes I do have to register to kryo (i.e. how to find out, which class does belong to the referenced ID 13994 and if this really is the problem.


Solution

  • The problem was that I created an additional Spark Context to the one that is already given after starting the spark-shell.

    Skipping

    val sc = new SparkContext(conf)
    

    (a.o.) in the above solves the problem.