Search code examples
postgresqlscalageolocationtwitter4jakka-stream

Write GeoLocation Twitter4J to Postgres


I am extracting tweets using Twitter4J and Akka Streams. I have chosen a few fields like userId, tweetId, tweet text and so on. This Tweet entity gets written to the database:

class Counter extends StatusAdapter with Databases{
  implicit val system = ActorSystem("TweetsExtractor")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher
  implicit val LoggingAdapter =
    Logging(system, classOf[Counter])

  val overflowStrategy = OverflowStrategy.backpressure
  val bufferSize = 1000
  val statusSource = Source.queue[Status](
    bufferSize,
    overflowStrategy
  )

  val insertFlow: Flow[Status, Tweet, NotUsed] =
    Flow[Status].map(status => Tweet(status.getId, status.getUser.getId, status.getText, status.getLang,
      status.getFavoriteCount, status.getRetweetCount))
  val insertSink: Sink[Tweet, Future[Done]] = Sink.foreach(tweetRepository.create)
  val insertGraph = statusSource via insertFlow to insertSink
  val queueInsert = insertGraph.run()

  override def onStatus(status: Status) = 
    Await.result(queueInsert.offer(status), Duration.Inf)
}

My intention is to add location field. There is a specific GeoLocation type for that in Twitter4J which contains latitude and longitude of double type. However, when I try to extract latitude and longitude directly through the flow nothing is written to the database:

Flow[Status].map(status => Tweet(status.getId, status.getUser.getId, status.getText, status.getLang, status.getFavoriteCount, status.getRetweetCount, status.getGeoLocation.getLatitude, status.getGeoLocation.getLongitude))

What may be the reason of such behavior and how can I fix it?


Solution

  • What is happening here, as confirmed in the comments to the question, is that most tweets do not come with geolocation data attached, making those fields empty and resulting in the misbehavior.

    A couple of simple checks for empty values should solve the issue.