Spark-Streaming from an Actor

I would like to have a consumer actor subscribe to a Kafka topic and stream data for further processing with Spark Streaming outside the consumer. Why an actor? Because I read that its supervisor strategy would be a great way to handle Kafka failures (e.g., restart on a failure).

I found two options:

  • The Java KafkaConsumer class: its poll() method returns a Map[String, Object]. I would like a DStream to be returned just like KafkaUtils.createDirectStream would, and I don't know how to fetch the stream from outside the actor.
  • Extend the ActorHelper trait and use actorStream() like shown in this example. This latter option doesn't display a connection to a topic but to a socket.

Could anyone point me in the right direction?


  • For handling Kafka failures, I used the Apache Curator framework and the following workaround:

    val client: CuratorFramework = ... // see docs
    val zk: CuratorZookeeperClient = client.getZookeeperClient
      * This method returns false if kafka or zookeeper is down.
    def isKafkaAvailable:Boolean = 
       Try {
          if (zk.isConnected) {
            val xs = client.getChildren.forPath("/brokers/ids")
            xs.size() > 0
          else false

    For consuming Kafka topics, I used the com.softwaremill.reactivekafka library. For example:

    class KafkaConsumerActor extends Actor {
       val kafka = new ReactiveKafka()
       val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
       override def preStart(): Unit = {
          val publisher = kafka.consume(config)
         * This method will be invoked when any kafka records will happen.
       def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
          // handle record