Search code examples
scalascalazscalaz-stream

continuously fetch database results with scalaz.stream


I'm new to scala and extremely new to scalaz. Through a different stackoverflow answer and some handholding, I was able to use scalaz.stream to implement a Process that would continuously fetch twitter API results. Now i'd like to do the same thing for the Cassandra DB where the twitter handles are stored.

The code for fetching the twitter results is here:

def urls: Seq[(Handle,URL)] = {
 Await.result(

   getAll(connection).map { List =>
      List.map(twitterToGet =>
   (twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
   )
  },
    5 seconds)

}

val fetchUrl = channel.lift[Task, (Handle, URL), Fetched] {
  url => Task.delay {

    val finalResult = callTwitter(url)
    if (finalResult.tweets.nonEmpty) {
      connection.updateTwitter(finalResult)
    } else {
      println("\n" + finalResult.handle + " does not have new tweets")
    }
    s"\ntwitter Fetch & database update completed"

  }
}

val P = Process
val process =
  (time.awakeEvery(3.second) zipWith P.emitAll(urls))((b, url) => url).
    through(fetchUrl)

val fetched = process.runLog.run
fetched.foreach(println)

What I'm planning to do is use

def urls: Seq[(Handle,URL)] = {

to continuously fetch Cassandra results (with an awakeEvery) and send them off to an actor to run the above twitter fetching code.

My question is, what is the best way to implement this with scalaz.stream? Note that i'd like it to get ALL the database results, then have a delay before getting ALL the database results again. Should i use the same architecture as the twitter fetching code above? If so, how would I create a channel.lift that doesn't require input? Is there a better way in scalaz.stream?

Thanks in advance


Solution

  • Got this working today. The cleanest way to do it would be to emit the database results as a stream and attach a sink to the end of the stream to do the twitter processing. What I actually have is a bit more complex as it retrieves the database results continuously and sends them off to an actor for the twitter processing. The style of retrieving the results follows my original code from my question:

    val connection = new simpleClient(conf.getString("cassandra.node"))
    
    implicit val threadPool = new ScheduledThreadPoolExecutor(4)
    val system = ActorSystem("mySystem")
    val twitterFetch = system.actorOf(Props[TwitterFetch], "twitterFetch")
    
      def myEffect = channel.lift[Task, simpleClient, String]{
        connection: simpleClient => Task.delay{
    
          val results = Await.result(
            getAll(connection).map { List =>
              List.map(twitterToGet =>
                (twitterToGet.handle, urlBoilerPlate + twitterToGet.handle + parameters + twitterToGet.sinceID)
              )
            },
            5 seconds)
    
          println("Query Successful, results= " +results +" at " + format.print(System.currentTimeMillis()))
    
          twitterFetch ! fetched(connection, results)
          s"database fetch completed"
        }
      }
    
      val P = Process
      val process =
        (time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
          through(myEffect)))
    
      val fetching = process.runLog.run
      fetching.foreach(println)
    

    Some notes:

    I had asked about using channel.lift without input, but it became clear that the input should be the cassandra connection.

    The line

    val process =
    (time.awakeEvery(3.second).flatMap(_ => P.emit(connection).
      through(myEffect)))
    

    Changed from zipWith to flatMap because I wanted to retrieve the results continuously instead of once.