Search code examples
scalaakkaakka-streamakka-httpscala-cats

how to integrate akka actors with decline


I have an application with akka, akka http, and akka streams, so it is not rest or microservice is a ephemeral app; it is a data extractor from a rest api and the app writes to a postgres db, and it works just i have a doubt , if it is correct to integrate decline and akka in this way.

code :

object App extends CommandIOApp(name = "app" , header = "extractor", version = "0.1.0") {

 
  val config = Config( token, jdbcDriver = jdbcDriver, jdbcURL = jdbcURL, jdbcUser = jdbcUser, jdbcPassword = jdbcPassword, jdbcSchema = jdbcSchema)

  override def main: Opts[IO[ExitCode]] =
    Opts.subcommand(name="extract-teams", help="Extract Teams from API"){

      val tableName = Opts.option[String]("recursive", short="r", help="Recursive Extraction")
      val outputFile = Opts.option[Path]("output", short="o", help="Output file").withDefault(Paths.get("output.csv"))

      ( tableName, outputFile).mapN{( table, output) =>
        println(table)
        println(output)

        ClickUpExtractions.extractTeams()

        IO(ExitCode.Success)
      }
    }

  object ClickUpExtractions extends ActorGlobalImplicits {

    def extractTeams(): Unit ={

      import com.arkondata.extractors.ClickUpTeamsActions._
      val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config), name = "EngineActor")
      val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config) , name = "Writer")

      extractorFetcher ! Fetch(extractorWriter)

    }

  }

}

receive code:

override def receive: Receive = {
    case WritePG(teamsData) =>
      println("-------writer")
      println(teamsData)
      val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
      println(teamsPG)

      println("insert-----------------------")
      //// manejar exception aqui  importante!!!
      val upsertStatement = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
      BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafeToFuture()//.unsafeRunSync()
      //// manejar exception aqui  importante!!!
      println("insert doobie")
      println("end------------------------")

      self ! "mensaje salida myself!!!!"
    case msg:String =>
      println(msg)
      context.system.terminate()
      System.exit(1) // this is the last step in the akka flow
  }

note:

System.exit(1) // this is the last step in the akka flow

also i use other libraries like doobie, decline, cats: https://ben.kirw.in/decline/effect.html . The applications works just i need to know if it is correct or if exists a better integration between akka and this : IO(ExitCode.Success)

is there any way to get from akka a tell response and validate it, something like :

val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res)) 
   IO(ExitCode.Success)
else 

   IO(ExitCode.Error)

Solution

  • Akka ecosystem relies on Future. Cats Effect relies on some F[_] - the build-in is IO but there are also Monix's Task and ZIO.

    Translation Future <=> IO can be done like:

    io.unsafeToFuture
    
    IO.fromFuture(IO(start future here)) // Future start is side-effect on its own
    

    Similarly for Monix:

    task.runToFuture // requires Scheduler
    
    Task.deferFuture(future thunk) // or
    Task.deferFutureAction(implicit scheduler => future)
    

    If you are using tagless then it's:

    Async.fromFuture(Sync[F].delay(start future)) // requires implicit Async[F]
    
    val runToFuture: F ~> Future // AFAIR there is no popular type class for it
    runToFuture(io)
    

    If you are are using streams then there is streamz library that handles Akka Streams <=> FS2 translations. Akka Streams, Monix's Observable implements, and FS2 provide an implementation for Reactive Streams so you could make these libraries talk to each other using RS interfaces, so libraries like streamz are just a convenience tools.

    There are no other integrations AFAIK and there is no need for them.

    However, I would recommend getting some basic knowledge how Akka and Future work (eagerness, memoization, Execution Contexts etc) vs how Cats Effect work (usually laziness, no memoization, no EC besides initialization of Clock or ContextShift in CE IO, slight differences between CE IO and Monix e.g. Scheduler). If you don't know the difference between them you can easily do some damage.