Search code examples
scalaspark-streamingakka-streamakka-http

Idiomatic way to use Spark DStream as Source for an Akka stream


I'm building a REST API that starts some calculation in a Spark cluster and responds with a chunked stream of the results. Given the Spark stream with calculation results, I can use

dstream.foreachRDD()

to send the data out of Spark. I'm sending the chunked HTTP response with akka-http:

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
    HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
}

For simplicity, I'm trying to get plain text working first, will add JSON marshalling later.

But what is the idiomatic way of using the Spark DStream as a Source for the Akka stream? I figured I should be able to do it via a socket but since the Spark driver and the REST endpoint are sitting on the same JVM opening a socket just for this seems a bit of an overkill.


Solution

  • Edit: This answer only applies to older version of spark and akka. PH88's answer is the correct method for recent versions.

    You can use an intermediate akka.actor.Actor that feeds a Source (similar to this question). The solution below is not "reactive" because the underlying Actor would need to maintain a buffer of RDD messages that may be dropped if the downstream http client isn't consuming chunks quickly enough. But this problem occurs regardless of the implementation details since you cannot connect the "throttling" of the akka stream back-pressure to the DStream in order to slow down the data. This is due to the fact that DStream does not implement org.reactivestreams.Publisher .

    The basic topology is:

    DStream --> Actor with buffer --> Source
    

    To construct this toplogy you have to create an Actor similar to the implementation here :

    //JobManager definition is provided in the link
    val actorRef = actorSystem actorOf JobManager.props 
    

    Create a stream Source of ByteStrings (messages) based on the JobManager. Also, convert the ByteString to HttpEntity.ChunkStreamPart which is what the HttpResponse requires:

    import akka.stream.actor.ActorPublisher
    import akka.stream.scaladsl.Source
    import akka.http.scaladsl.model.HttpEntity
    import akka.util.ByteString
    
    type Message = ByteString
    
    val messageToChunkPart = 
      Flow[Message].map(HttpEntity.ChunkStreamPart(_))
    
    //Actor with buffer --> Source
    val source : Source[HttpEntity.ChunkStreamPart, Unit] = 
      Source(ActorPublisher[Message](actorRef)) via messageToChunkPart
    

    Link the Spark DStream to the Actor so that each incomining RDD is converted to an Iterable of ByteString and then forwarded to the Actor:

    import org.apache.spark.streaming.dstream.Dstream
    import org.apache.spark.rdd.RDD
    
    val dstream : DStream = ???
    
    //This function converts your RDDs to messages being sent
    //via the http response
    def rddToMessages[T](rdd : RDD[T]) : Iterable[Message] = ???
    
    def sendMessageToActor(message : Message) = actorRef ! message
    
    //DStream --> Actor with buffer
    dstream foreachRDD {rddToMessages(_) foreach sendMessageToActor}
    

    Provide the Source to the HttpResponse:

    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(HttpMethods.GET, Uri.Path("/data"), _, _, _) =>
        HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain`, source))
    }
    

    Note: there should be very little time/code between the dstream foreachRDD line and the HttpReponse since the Actor's internal buffer will immediately begin to fill with ByteString message coming from the DStream after the foreach line is executed.