Search code examples
scalascalazscalaz-stream

Repeatly eval T => scala.concurrent.Future[T] to a Process[?, T]


I have a function get: T => scala.concurrent.Future[T]

I want to iterates it like :

   val futs: Iterator[Future[T]] = Iterator.iterate(get(init)){
        _.flatMap(prev => get(prev))
   }

But the type of Iterator is Future[T], it is not easy to process this iterator.

How could I transfer that to Process[?, T]

(Maybe T => Future[T] as context type F).


Solution

  • Not super nice solution, but works

      import scala.concurrent.ExecutionContext.Implicits.global
      import scala.concurrent.{Future => SFuture}
      import scala.language.implicitConversions
      import scalaz.concurrent.Task
      import scalaz.stream._
    
      implicit class Transformer[+T](fut: => SFuture[T]) {
        def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
          import scala.util.{Success, Failure}
          import scalaz.syntax.either._
          Task.async {
            register =>
              fut.onComplete {
                case Success(v) => register(v.right)
                case Failure(ex) => register(ex.left)
              }
          }
        }
      }
    
      val init: Int = 0
    
      def f(i: Int): SFuture[Int] = SFuture(i + 1)
    
      val p = Process.repeatEval[Task, Int] {
        var prev = init
        f(prev).toTask.map(next => {prev = next; next})
      }
    
      println(p.take(10).runLog.run)