NoSuchMethodError in shapeless seen only in Spark

I am trying to write a Spark connector to pull AVRO messages off a RabbitMQ message queue. When decoding the AVRO messages, there is a NoSuchMethodError error that occurs only when running in Spark.

I could not reproduce the Spark code exactly outside of spark, but I believe the two examples are sufficiently similar. I think this is the smallest code that reproduces the same scenario.

I've removed all the connection parameters both because the information is private and the connection does not appear to be the issue.

Spark code:

package simpleexample

import org.apache.spark.SparkConf
import org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDistributedKey
import org.apache.spark.streaming.rabbitmq.models.ExchangeAndRouting
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import com.sksamuel.avro4s._

import{ByteArrayInputStream, ByteArrayOutputStream}
import com.rabbitmq.client.QueueingConsumer.Delivery
import java.util.HashMap

case class AttributeTuple(attrName: String, attrValue: String)

// AVRO Schema for Events

case class DeviceEvent(
    tenantName: String, 
    groupName: String, 
    subgroupName: String, 
    eventType: String, 
    eventSource: String,
    deviceTypeName: String,
    deviceId: Int,
    timestamp: Long,
    attribute: AttributeTuple

object RabbitMonitor {
  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RabbitMonitor")
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    def parseArrayEvent(delivery: Delivery): Seq[DeviceEvent] = {
        val in = new ByteArrayInputStream(delivery.getBody())
        val input = AvroInputStream.binary[DeviceEvent](in)

    val params: Map[String, String] = Map(
        /* many rabbit connection parameters */
        "maxReceiveTime" -> "60000" // 60s

    val distributedKey = Seq(
          /* queue name */, 
          new ExchangeAndRouting(/* exchange name */, /* routing key */),

    var events = RabbitMQUtils.createDistributedStream[Seq[DeviceEvent]](ssc, distributedKey, params, parseArrayEvent)


Non-Spark code:

package simpleexample

import com.thenewmotion.akka.rabbitmq._
// avoid name collision with rabbitmq channel
import scala.concurrent.{Channel => BasicChannel}

import com.sksamuel.avro4s._
import{ByteArrayInputStream, ByteArrayOutputStream}

object Test extends App {
    implicit val system = ActorSystem()

    val factory = new ConnectionFactory()
    /* Set connection parameters*/
    val exchange: String = /* exchange name */

    val connection: ActorRef = system.actorOf(ConnectionActor.props(factory), "rabbitmq")

    def setupSubscriber(channel: Channel, self: ActorRef) {
        val queue = channel.queueDeclare().getQueue
        channel.queueBind(queue, exchange, /* routing key */)
        val consumer = new DefaultConsumer(channel) {
          override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
            val in = new ByteArrayInputStream(body)
            val input = AvroInputStream.binary[DeviceEvent](in)
            val result = input.iterator.toSeq

        channel.basicConsume(queue, true, consumer)

    connection ! CreateChannel(ChannelActor.props(setupSubscriber), Some("eventSubscriber"))

    scala.concurrent.Future {
        def loop(n: Long) {
            if (n < 30) {
                loop(n + 1)

Non-Spark Output (the last line is a successfully decoded update):

drex@drexThinkPad:~/src/scala/so-repro/connector/target/scala-2.11$ scala project.jar 
[INFO] [03/02/2017 14:11:06.899] [] [akka://default/deadLetters] Message [com.thenewmotion.akka.rabbitmq.ChannelCreated] from Actor[akka://default/user/rabbitmq#-889215077] to Actor[akka://default/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/02/2017 14:11:07.337] [] [akka://default/user/rabbitmq] akka://default/user/rabbitmq connected to amqp://<rabbit info>
[INFO] [03/02/2017 14:11:07.509] [] [akka://default/user/rabbitmq/eventSubscriber] akka://default/user/rabbitmq/eventSubscriber connected
Stream(DeviceEvent(int,na,d01,deviceAttrUpdate,device,TestDeviceType,33554434,1488492704421,AttributeTuple(temperature,60)), ?)

Spark Output:

drex@drexThinkPad:~/src/scala/so-repro/connector/target/scala-2.11$ spark-submit ./project.jar --class RabbitMonitor
Using Spark's default log4j profile: org/apache/spark/
17/03/02 14:20:15 INFO SparkContext: Running Spark version 2.1.0
17/03/02 14:20:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/02 14:20:16 WARN Utils: Your hostname, drexThinkPad resolves to a loopback address:; using instead (on interface wlp3s0)
17/03/02 14:20:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/02 14:20:16 INFO SecurityManager: Changing view acls to: drex
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls to: drex
17/03/02 14:20:16 INFO SecurityManager: Changing view acls groups to: 
17/03/02 14:20:16 INFO SecurityManager: Changing modify acls groups to: 
17/03/02 14:20:16 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(drex); groups with view permissions: Set(); users  with modify permissions: Set(drex); groups with modify permissions: Set()
17/03/02 14:20:16 INFO Utils: Successfully started service 'sparkDriver' on port 34701.
17/03/02 14:20:16 INFO SparkEnv: Registering MapOutputTracker
17/03/02 14:20:16 INFO SparkEnv: Registering BlockManagerMaster
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Using for getting topology information
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/03/02 14:20:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-5cbb13bf-78fe-4227-81b3-1afea40f899a
17/03/02 14:20:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
17/03/02 14:20:16 INFO SparkEnv: Registering OutputCommitCoordinator
17/03/02 14:20:16 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/03/02 14:20:16 INFO SparkUI: Bound SparkUI to, and started at
17/03/02 14:20:16 INFO SparkContext: Added JAR file:/home/drex/src/scala/so-repro/connector/target/scala-2.11/./project.jar at spark:// with timestamp 1488493216614
17/03/02 14:20:16 INFO Executor: Starting executor ID driver on host localhost
17/03/02 14:20:16 INFO Utils: Successfully started service '' on port 33276.
17/03/02 14:20:16 INFO NettyBlockTransferService: Server created on
17/03/02 14:20:16 INFO BlockManager: Using for block replication policy
17/03/02 14:20:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver,, 33276, None)
17/03/02 14:20:16 INFO BlockManagerMasterEndpoint: Registering block manager with 366.3 MB RAM, BlockManagerId(driver,, 33276, None)
17/03/02 14:20:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver,, 33276, None)
17/03/02 14:20:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver,, 33276, None)
17/03/02 14:20:17 INFO RabbitMQDStream: Duration for remembering RDDs set to 60000 ms for org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDStream@546621c4
17/03/02 14:20:17 INFO RabbitMQDStream: Slide time = 60000 ms
17/03/02 14:20:17 INFO RabbitMQDStream: Storage level = Memory Deserialized 1x Replicated
17/03/02 14:20:17 INFO RabbitMQDStream: Checkpoint interval = null
17/03/02 14:20:17 INFO RabbitMQDStream: Remember interval = 60000 ms
17/03/02 14:20:17 INFO RabbitMQDStream: Initialized and validated org.apache.spark.streaming.rabbitmq.distributed.RabbitMQDStream@546621c4
17/03/02 14:20:17 INFO ForEachDStream: Slide time = 60000 ms
17/03/02 14:20:17 INFO ForEachDStream: Storage level = Serialized 1x Replicated
17/03/02 14:20:17 INFO ForEachDStream: Checkpoint interval = null
17/03/02 14:20:17 INFO ForEachDStream: Remember interval = 60000 ms
17/03/02 14:20:17 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@49c6ddef
17/03/02 14:20:17 INFO RecurringTimer: Started timer for JobGenerator at time 1488493260000
17/03/02 14:20:17 INFO JobGenerator: Started JobGenerator at 1488493260000 ms
17/03/02 14:20:17 INFO JobScheduler: Started JobScheduler
17/03/02 14:20:17 INFO StreamingContext: StreamingContext started
17/03/02 14:21:00 INFO JobScheduler: Added jobs for time 1488493260000 ms
17/03/02 14:21:00 INFO JobScheduler: Starting job streaming job 1488493260000 ms.0 from job set of time 1488493260000 ms
17/03/02 14:21:00 INFO SparkContext: Starting job: print at RabbitMonitor.scala:94
17/03/02 14:21:00 INFO DAGScheduler: Got job 0 (print at RabbitMonitor.scala:94) with 1 output partitions
17/03/02 14:21:00 INFO DAGScheduler: Final stage: ResultStage 0 (print at RabbitMonitor.scala:94)
17/03/02 14:21:00 INFO DAGScheduler: Parents of final stage: List()
17/03/02 14:21:00 INFO DAGScheduler: Missing parents: List()
17/03/02 14:21:00 INFO DAGScheduler: Submitting ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93), which has no missing parents
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 366.3 MB)
17/03/02 14:21:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1752.0 B, free 366.3 MB)
17/03/02 14:21:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on (size: 1752.0 B, free: 366.3 MB)
17/03/02 14:21:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/03/02 14:21:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (RabbitMQRDD[0] at createDistributedStream at RabbitMonitor.scala:93)
17/03/02 14:21:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/03/02 14:21:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 7744 bytes)
17/03/02 14:21:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/03/02 14:21:00 INFO Executor: Fetching spark:// with timestamp 1488493216614
17/03/02 14:21:00 INFO TransportClientFactory: Successfully created connection to / after 23 ms (0 ms spent in bootstraps)
17/03/02 14:21:00 INFO Utils: Fetching spark:// to /tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/fetchFileTemp2710654534934784726.tmp
17/03/02 14:21:00 INFO Executor: Adding file:/tmp/spark-92b6ff6a-b120-4fd0-ba46-a450eff80636/userFiles-c0a334f3-68fc-495f-8ccd-cfe90e6d0bf8/project.jar to class loader
17/03/02 14:21:02 INFO RabbitMQRDD: Receiving data in Partition 0 from  
17/03/02 14:21:50 WARN BlockManager: Putting block rdd_0_0 failed due to an exception
17/03/02 14:21:50 WARN BlockManager: Block rdd_0_0 could not be removed as it was not found on disk or in memory
17/03/02 14:21:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447)
    at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.executor.Executor$
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
    at java.util.concurrent.ThreadPoolExecutor$
17/03/02 14:21:50 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NoSuchMethodError:;)Lshapeless/Lazy;
    at com.sksamuel.avro4s.SchemaFor$.recordBuilder(SchemaFor.scala:447)
    at simpleexample.RabbitMonitor$$anon$3.<init>(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$.simpleexample$RabbitMonitor$$parseArrayEvent$1(RabbitMonitor.scala:70)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at simpleexample.RabbitMonitor$$anonfun$15.apply(RabbitMonitor.scala:93)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator$$anonfun$5.apply(RabbitMQRDD.scala:209)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.processDelivery(RabbitMQRDD.scala:209)
    at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getNext(RabbitMQRDD.scala:194)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.executor.Executor$
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
    at java.util.concurrent.ThreadPoolExecutor$

17/03/02 14:21:50 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
17/03/02 14:21:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/03/02 14:21:50 INFO TaskSchedulerImpl: Cancelling stage 0


retrieveManaged := true

lazy val sparkVersion = "2.1.0"

scalaVersion in ThisBuild := "2.11.8"

lazy val rabbit = (project in file("rabbit-plugin")).settings(
    name := "Spark Streaming RabbitMQ Receiver",
    homepage := Some(url("")),
    description := "RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark from RabbitMQ.",
    exportJars := true,

    assemblyJarName in assembly := "rabbit.jar",
    test in assembly := {},

    moduleName := "spark-rabbitmq",
    organization := "com.stratio.receive",
    version := "0.6.0",

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
        "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", 
        "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", 
        "com.typesafe.akka" %% "akka-actor" % "2.4.11",
        "com.rabbitmq" % "amqp-client" % "3.6.6",
        "joda-time" % "joda-time" % "2.8.2",
        "com.github.sstone" %% "amqp-client" % "1.5" % Test,
        "org.scalatest" %% "scalatest" % "2.2.2" % Test,
        "org.scalacheck" %% "scalacheck" % "1.11.3" % Test,
        "junit" % "junit" % "4.12" % Test, 
        "com.typesafe.akka" %% "akka-testkit" % "2.4.11" % Test

lazy val root = (project in file("connector")).settings(
    name := "Connector from Rabbit to Kafka queue",
    description := "",
    exportJars := true,

    test in assembly := {},
    assemblyJarName in assembly := "project.jar",

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
        "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
        "com.thenewmotion" %% "akka-rabbitmq" % "3.0.0",
        "org.apache.kafka" % "kafka_2.10" % "",
        "com.sksamuel.avro4s" %% "avro4s-core" % "1.6.4"
) dependsOn rabbit

I am also using assembly to put together a "fat jar" for spark (addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")) and using the command sbt assembly to produce the jar used in both examples above. I'm running Spark 2.1.0.

I'm relatively new to the Spark / Scala ecosystem so hopefully this is a problem with my build settings. It makes no sense that shapeless would be unavailable in Spark.


  • Same issue myself. I just add more details for others facing it.


    Everything works fine till I deploy to cluster. Then I get

    Exception in thread "main" java.lang.NoSuchMethodError: 'shapeless.DefaultSymbolicLabelling shapeless.DefaultSymbolicLabelling$.instance(shapeless.HList)'

    Root Cause

    Following the stacktrace, I know it is related to the circe library. Then I run the dependency (make sure you have addDependencyTreePlugin in your ~/.sbt/1.0/plugins/plugins.sbt file):

    ❯ sbt "whatDependsOn com.chuusai shapeless_2.12"
    [info] welcome to sbt 1.6.2 ( Inc. Java 1.8.0_332)
    [info] com.chuusai:shapeless_2.12:2.3.7 [S]
    [info]   +-io.circe:circe-generic_2.12:0.14.1 [S]
    [info]     +-***

    but if I run the dependency with "provided" scope, I get:

    ❯ sbt provided:"whatDependsOn com.chuusai shapeless_2.12"
    [info] welcome to sbt 1.6.2 ( Inc. Java 1.8.0_332)
    [info] com.chuusai:shapeless_2.12:2.3.3 [S]
    [info]   +-org.scalanlp:breeze_2.12:1.0 [S]
    [info]     +-org.apache.spark:spark-mllib-local_2.12:3.1.3
    [info]     | +-org.apache.spark:spark-graphx_2.12:3.1.3
    [info]     | | +-org.apache.spark:spark-mllib_2.12:3.1.3
    [info]     | |   +-***
    [info]     | |
    [info]     | +-org.apache.spark:spark-mllib_2.12:3.1.3
    [info]     |   +-***
    [info]     |
    [info]     +-org.apache.spark:spark-mllib_2.12:3.1.3
    [info]       +-***

    As you can see, the instance function in version 2.3.7 is not present in version 2.3.3 (it is added in version 2.3.5):

    Didn't work

    Adding the dependency didn't fix my issue.

    val CirceVersion = "0.14.1"
    val ShapelessVersion = "2.3.7" // Circe 0.14.1 uses 2.3.7; Spark 3.1.3 uses 2.3.3
    val SparkVersion = "3.1.3"
    lazy val CirceDeps: Seq[ModuleID] = Seq(
        "io.circe" %% "circe-generic" % CirceVersion,
        /* Shapeless is one of the Spark dependencies. As Spark is provided, it is not included in the uber jar.
         * Adding the dependency explicitly to make sure we have the correct version at run-time
        "com.chuusai" %% "shapeless" % ShapelessVersion

    I keep this in my code just for documentation purpose only.

    What worked

    The main fix is actually to rename Shapeless library (see my comments)the question that I pick the answer

    /** Shapeless is one of the Spark dependencies. At run-time, they clash and Spark's shapeless package takes
      * precedence. It results run-time error as shapeless 2.3.7 and 2.3.3 are not fully compatible.
      * Here, we are are renaming the library so they co-exist in run-time and Spark uses its own version and Circe also
      * uses its own version.
    // noinspection SbtDependencyVersionInspection
    lazy val shadingRules: Def.Setting[Seq[ShadeRule]] =
      assembly / assemblyShadeRules := Seq(
          .rename("shapeless.**" -> "shadeshapless.@1")
          .inLibrary("com.chuusai" % "shapeless_2.12" % Dependencies.ShapelessVersion)
          .rename("shapeless.**" -> "shadeshapless.@1")
          .inLibrary("io.circe" % "circe-generic_2.12" % Dependencies.CirceVersion)

    Update 2022-08-20

    Based on @denis-arnaud comment, here is a simpler version from pureconfig

    assembly / assemblyShadeRules := Seq(ShadeRule.rename("shapeless.**" -> "new_shapeless.@1").inAll)

    I guess the simple one works for most the situations. The more complex one is good for when there are different versions of shapeless in the classpath, and you'd like to rename them in @1, @2, etc.