I have an application with akka, akka http, and akka streams, so it is not rest or microservice is a ephemeral app; it is a data extractor from a rest api and the app writes to a postgres db, and it works just i have a doubt , if it is correct to integrate decline and akka in this way.
code :
object App extends CommandIOApp(name = "app" , header = "extractor", version = "0.1.0") {
val config = Config( token, jdbcDriver = jdbcDriver, jdbcURL = jdbcURL, jdbcUser = jdbcUser, jdbcPassword = jdbcPassword, jdbcSchema = jdbcSchema)
override def main: Opts[IO[ExitCode]] =
Opts.subcommand(name="extract-teams", help="Extract Teams from API"){
val tableName = Opts.option[String]("recursive", short="r", help="Recursive Extraction")
val outputFile = Opts.option[Path]("output", short="o", help="Output file").withDefault(Paths.get("output.csv"))
( tableName, outputFile).mapN{( table, output) =>
println(table)
println(output)
ClickUpExtractions.extractTeams()
IO(ExitCode.Success)
}
}
object ClickUpExtractions extends ActorGlobalImplicits {
def extractTeams(): Unit ={
import com.arkondata.extractors.ClickUpTeamsActions._
val extractorFetcher = system.actorOf(ClickUpTeamsFetcher.props(config), name = "EngineActor")
val extractorWriter = system.actorOf(ClickUpTeamsBulk.props(config) , name = "Writer")
extractorFetcher ! Fetch(extractorWriter)
}
}
}
receive code:
override def receive: Receive = {
case WritePG(teamsData) =>
println("-------writer")
println(teamsData)
val teamsPG: List[Teams] = teamsData.map(data => Teams(data.id, data.name, data.color, data.avatar))
println(teamsPG)
println("insert-----------------------")
//// manejar exception aqui importante!!!
val upsertStatement = "insert into Teams (id, name, color, avatar) values ( ? , ? , ? , ?) ON CONFLICT(id) DO UPDATE SET name = EXCLUDED.name"
BulkWriter(config).insertMany[Teams](teamsPG,upsertStatement).unsafeRunSync()//.unsafeToFuture()//.unsafeRunSync()
//// manejar exception aqui importante!!!
println("insert doobie")
println("end------------------------")
self ! "mensaje salida myself!!!!"
case msg:String =>
println(msg)
context.system.terminate()
System.exit(1) // this is the last step in the akka flow
}
note:
System.exit(1) // this is the last step in the akka flow
also i use other libraries like doobie, decline, cats: https://ben.kirw.in/decline/effect.html .
The applications works just i need to know if it is correct or if exists a better integration between akka and this : IO(ExitCode.Success)
is there any way to get from akka a tell response and validate it, something like :
val res = extractorFetcher ! Fetch(extractorWriter)
if (validate(res))
IO(ExitCode.Success)
else
IO(ExitCode.Error)
Akka ecosystem relies on Future
. Cats Effect relies on some F[_]
- the build-in is IO
but there are also Monix's Task
and ZIO.
Translation Future <=> IO can be done like:
io.unsafeToFuture
IO.fromFuture(IO(start future here)) // Future start is side-effect on its own
Similarly for Monix:
task.runToFuture // requires Scheduler
Task.deferFuture(future thunk) // or
Task.deferFutureAction(implicit scheduler => future)
If you are using tagless then it's:
Async.fromFuture(Sync[F].delay(start future)) // requires implicit Async[F]
val runToFuture: F ~> Future // AFAIR there is no popular type class for it
runToFuture(io)
If you are are using streams then there is streamz library that handles Akka Streams <=> FS2 translations. Akka Streams, Monix's Observable implements, and FS2 provide an implementation for Reactive Streams so you could make these libraries talk to each other using RS interfaces, so libraries like streamz are just a convenience tools.
There are no other integrations AFAIK and there is no need for them.
However, I would recommend getting some basic knowledge how Akka and Future work (eagerness, memoization, Execution Contexts etc) vs how Cats Effect work (usually laziness, no memoization, no EC besides initialization of Clock
or ContextShift
in CE IO, slight differences between CE IO and Monix e.g. Scheduler). If you don't know the difference between them you can easily do some damage.