Search code examples
scalaapache-kafkaspark-streamingactor

Spark-Streaming from an Actor


I would like to have a consumer actor subscribe to a Kafka topic and stream data for further processing with Spark Streaming outside the consumer. Why an actor? Because I read that its supervisor strategy would be a great way to handle Kafka failures (e.g., restart on a failure).

I found two options:

  • The Java KafkaConsumer class: its poll() method returns a Map[String, Object]. I would like a DStream to be returned just like KafkaUtils.createDirectStream would, and I don't know how to fetch the stream from outside the actor.
  • Extend the ActorHelper trait and use actorStream() like shown in this example. This latter option doesn't display a connection to a topic but to a socket.

Could anyone point me in the right direction?


Solution

  • For handling Kafka failures, I used the Apache Curator framework and the following workaround:

    val client: CuratorFramework = ... // see docs
    val zk: CuratorZookeeperClient = client.getZookeeperClient
    
    /**
      * This method returns false if kafka or zookeeper is down.
      */ 
    def isKafkaAvailable:Boolean = 
       Try {
          if (zk.isConnected) {
            val xs = client.getChildren.forPath("/brokers/ids")
            xs.size() > 0
          }
          else false
        }.getOrElse(false)
    

    For consuming Kafka topics, I used the com.softwaremill.reactivekafka library. For example:

    class KafkaConsumerActor extends Actor {
       val kafka = new ReactiveKafka()
       val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
    
       override def preStart(): Unit = {
          super.preStart()
    
          val publisher = kafka.consume(config)
          Source.fromPublisher(publisher)
                .map(handleKafkaRecord)
                .to(Sink.ignore).run()
       }
    
       /**
         * This method will be invoked when any kafka records will happen.
         */
       def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
          // handle record
       }
    }