Search code examples
scalaakkaakka-stream

Convert Akka Graph DSL Stream return type to Future[T] Instead of T (Wait)


Consider the following class:

class MongoDumpService @Inject()(eventsDao: EventDAO)(implicit val ec: ExecutionContext, mat: Materializer) extends LazyLogging {

  private[services] def toAssetsWriterSink: Sink[List[Asset], FileDetails] = ParquetService.toParquetSingleFile[List[Asset]](AppConfig.AssetsFileName)
  private[services] def toExpenseWriterSink: Sink[List[Expense], FileDetails] = ParquetService.toParquetSingleFile[List[Expense]](AppConfig.ExpensesFileName)
  private[services] def toReportsWriterSink: Sink[List[Report], FileDetails] = ParquetService.toParquetSingleFile[List[Report]](AppConfig.ReportsFileName)
  private[services] def toTransactionsWriterSink: Sink[List[Transaction], FileDetails] = ParquetService.toParquetSingleFile[List[Transaction]](AppConfig.TransactionsFileName)
  private[services] def toEventsWriterSink: Sink[PacificOriginalEvent, FileDetails] = ParquetService.toParquetSingleFile[PacificOriginalEvent](AppConfig.PacificOriginalEventFileName)


  def createMongoDump(recordingId: BSONObjectID, maxDocs: Option[Int] = None): List[FileDetails] = RunnableGraph.fromGraph(
    GraphDSL.create(toAssetsWriterSink, toExpenseWriterSink, toReportsWriterSink, toTransactionsWriterSink, toEventsWriterSink, sharedKillSwitch.flow[Event])((f1,f2,f3,f4,f5,_) => List(f1,f2,f3,f4,f5)) {
      import GraphDSL.Implicits._
      implicit builder =>
        (writeAssets, writeExpenses, writeReports, writeTransactions, writerEvents, sw) =>

          val source    = builder.add(eventsDao.getEventsSource(recordingId.stringify, maxDocs))
          val broadcast = builder.add(Broadcast[Event](5))

          source ~> sw ~> broadcast
                          broadcast.out(Write.PacificEvents).map(_.pacificEvent)                                     ~> writerEvents
                          broadcast.out(Write.Expenses).filter(_.expenses.isDefined).map(_.expenses.get)             ~> writeExpenses
                          broadcast.out(Write.Assets).filter(_.assets.isDefined).map(_.assets.get)                   ~> writeAssets
                          broadcast.out(Write.Reports).filter(_.reports.isDefined).map(_.reports.get)                ~> writeReports
                          broadcast.out(Write.Transactions).filter(_.transactions.isDefined).map(_.transactions.get) ~> writeTransactions

          ClosedShape

    }).run()

}

This code is return List[FileDetails]], its actually writing Event Object which includes some fields of Option[List[T]] to the file its supposed to be written, for example fieldA ~> writerFieldA and so on the problem is as follows:

I want to Wait Until this operation will be finished, since this will upload to S3 files with 0KB:

 private[actors] def uploadDataToS3(recording: Recording) = {
    logger.info(s"Uploading data to S3 with recordingId: ${recording._id.stringify}")
    val details = mongoDumpService.createMongoDump(recording._id, recording.limit)
    s3Service.uploadFiles(recording._id.stringify, details)
  }

Without graph DSL i can do runWith witch returns Future[..]

How can i achieve this with graphDSL? ( I want to return Future[List[FileDetails]]]

Edit :

Added toParquetSingleFile

 def toParquetSingleFile[In](fileName: String)(implicit
                                                ec: ExecutionContext,
                                                mat: Materializer,
                                                writes: Writes[In]): Sink[In, FileDetails] = {
    val absolutePath = TEMP_DIRECTORY + File.separator + s"$fileName.${FileExtension.PARQUET.toSuffix}"
    toJsString[In]
      .log(s"ParquetService", _ => s"[✍️] - Writing element toParquetSingleFile for path: $absolutePath ...")
      .withAttributes(Attributes.logLevels(onFailure = LogLevels.Error, onFinish = LogLevels.Off, onElement = LogLevels.Info))
      .to(
        ParquetStreams.toParquetSingleFile(
          path = absolutePath,
          options = ParquetWriter.Options(
            writeMode = ParquetFileWriter.Mode.OVERWRITE,
            compressionCodecName = CompressionCodecName.GZIP))
      ).mapMaterializedValue(_ => FileDetails(absolutePath, FileExtension.PARQUET))
  }

Solution:

def toParquetSingleFile[In](fileName: String)(implicit ec: ExecutionContext, mat: Materializer, writes: Writes[In]): Sink[In, Future[Option[FileDetails]]] = {
    val absolutePath = TEMP_DIRECTORY + File.separator + s"$fileName.${FileExtension.PARQUET.toSuffix}"
    toJsString[In]
      .toMat(
        Sink.lazySink(() => ParquetStreams.toParquetSingleFile(
          path = absolutePath,
          options = ParquetWriter.Options(
            writeMode = ParquetFileWriter.Mode.OVERWRITE,
            compressionCodecName = CompressionCodecName.GZIP))
        )
      )(Keep.right)
      .mapMaterializedValue(_.flatten
        .map { _ =>
          logger.info(s"[ParquetService] - [✍️] Writing file: [$absolutePath] Finished!")
          Some(FileDetails(absolutePath, FileExtension.PARQUET))
        }
        .recover {
          case _: NeverMaterializedException => Option.empty[FileDetails]
        }
      )
  }

Solution

  • As I see, this toParquetSingleFile creates a Sink with a Future[Done] as materialized value. But, in your function you are returning via mapMaterializedValue one FileDetails instance. I think that the mapMaterializedValue function that you are using accepts a function of

    mapMaterializedValue(mat: Future[Done] => Mat2)
    

    So if you map the Future[Done] to a Future[FileDetails] you will have a List[Future[FileDetails]] that you can flatten using Future sequence operation or other approach to get the Future[List[FileDetails]]

    Trying to simulate your scenario, you have a function that create a Sink that writes a file and materializes a Future[Done]:

    case class FileDetails(absPath: String, fileExtension: Int)
    
    def sink[In] : Sink[In, Done] = ???
    

    remove the mapMaterializedValue from your function and you will have something like the above.

    Then, create a function that maps that materialized value:

    def mapMatValue[In](in: Sink[In, Future[Done]]) =
      in.mapMaterializedValue(result => result.map(_ => FileDetails("path", 0))
    

    Using that, your createMongoDump should return Sink[In, List[Future[FileDetails]]

    And finally, use Future.sequence(list) to obtain a Future[List[Future.sequence]]. You could use traverse too.