Search code examples
scalatwitterplayframeworktwitter-streaming-apiplayframework-2.5

Playframework and Twitter Streaming API


How to read response data from Twitter Streaming API - POST statuses/filter ? I have established connection and I receive 200 status code, but I don't know how to read tweets. I just want to println tweets as they coming.

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response =>
  if(response.headers.status == 200)
    println(response.body)
} 

EDIT: I found this solution

ws.url(url)
.sign(OAuthCalculator(consumerKey, requestToken))
.withMethod("POST")
.stream()
.map { response => 
  if(response.headers.status == 200){
    response.body
      .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
      .filter(_.contains("\r\n"))
      .map(json => Try(parse(json).extract[Tweet]))
      .runForeach {
        case Success(tweet) =>
          println("-----")
          println(tweet.text)
        case Failure(e) =>
          println("-----")
          println(e.getStackTrace)
      }
  }
}

Solution

  • The body of the response for a streaming WS request is an Akka Streams Source of bytes. Since Twitter Api responses are newline delimited (usually) you can use Framing.delimiter to split them up into byte chunks, parse the chunks to JSON, and do what you want with them. Something like this should work:

    import akka.stream.scaladsl.Framing
    import scala.util.{Success, Try}
    import akka.util.ByteString
    import play.api.libs.json.{JsSuccess, Json, Reads}
    import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken}
    
    case class Tweet(id: Long, text: String)
    object Tweet {
      implicit val reads: Reads[Tweet] = Json.reads[Tweet]
    }
    
    def twitter = Action.async { implicit request =>
      ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016")
          .sign(OAuthCalculator(consumerKey, requestToken))
          .withMethod("POST")
          .stream().flatMap { response =>
        response.body
          // Split up the byte stream into delimited chunks. Note
          // that the chunks are quite big
          .via(Framing.delimiter(ByteString.fromString("\n"), 20000))
          // Parse the chunks into JSON, and then to a Tweet.
          // A better parsing strategy would be to account for all
          // the different possible responses, but here we just
          // collect those that match a Tweet.
          .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet]))
          .collect {
            case Success(JsSuccess(tweet, _)) => tweet.text
          }
          // Print out each chunk
          .runForeach(println).map { _ =>
            Ok("done")
        }
      }
    }
    

    Note: to materialize the stream you'll need to inject an implicit Materializer into your controller.