Search code examples
mongodbscalaakka-streamakka-http

Streaming CSV Source with AKKA-HTTP


I am trying to stream data from Mongodb using reactivemongo-akkastream 0.12.1 and return the result into a CSV stream in one of the routes (using Akka-http). I did implement that following the exemple here:

http://doc.akka.io/docs/akka-http/10.0.0/scala/http/routing-dsl/source-streaming-support.html#simple-csv-streaming-example

and it looks working fine.

The only problem I am facing now is how to add the headers to the output CSV file. Any ideas?

Thanks


Solution

  • Aside from the fact that that example isn't really a robust method of generating CSV (doesn't provide proper escaping) you'll need to rework it a bit to add headers. Here's what I would do:

    1. make a Flow to convert a Source[Tweet] to a source of CSV rows, e.g. a Source[List[String]]
    2. concatenate it to a source containing your headers as a single List[String]
    3. adapt the marshaller to render a source of rows rather than tweets

    Here's some example code:

    case class Tweet(uid: String, txt: String)
    
    def getTweets: Source[Tweet, NotUsed] = ???
    
    val tweetToRow: Flow[Tweet, List[String], NotUsed] =
      Flow[Tweet].map { t =>
        List(
          t.uid,
          t.txt.replaceAll(",", "."))
      }
    
    // provide a marshaller from a row (List[String]) to a ByteString
    implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
      Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () =>
        ByteString(row.mkString(","))
      )
    }
    
    // enable csv streaming
    implicit val csvStreaming = EntityStreamingSupport.csv()
    
    val route = path("tweets") {
      val headers = Source.single(List("uid", "text"))
      val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
      complete(headers.concat(tweets))
    }
    

    Update: if your getTweets method returns a Future you can just map over its source value and prepend the headers that way, e.g:

    val route = path("tweets") {
      val headers = Source.single(List("uid", "text"))
      val rows: Future[Source[List[String], NotUsed]] = getTweets
          .map(tweets => headers.concat(tweets.via(tweetToRow)))
      complete(rows)
    }