Search code examples
scalaakkaakka-streamakka-persistence

How to save streaming data using Akka Persistence


I use StreamRefs to establish streaming connections between actors in the cluster. Currently, in the writing node, I save incoming messages to the log file manually, but I wonder is it possible to replace it with persistent Sink for writing and persistent Source for reading on actor startup from the Akka Persistence journal. I've been thinking of replacing the log file sink with Persistent actor's persist { evt => ... }, but since it executes asynchronously I'll lose the backpressure. So is it possible to write streaming data with backpressure into Akka Persistence journal and read this data in a streaming manner on actor recover?

Current implementation:

object Writer {
  case class WriteSinkRequest(userId: String) 
  case class WriteSinkReady(userId: String, sinkRef: SinkRef[ByteString])
  case class ReadSourceRequest(userId: String)
  case class ReadSourceReady(userId: String, sourceRef: SourceRef[ByteString])
}

class Writer extends Actor {

    // code omitted

    val logsDir = "logs"

    val path = Files.createDirectories(FileSystems.getDefault.getPath(logsDir))

    def logFile(id: String) = {
        path.resolve(id)
    }

    def logFileSink(logId: String): Sink[ByteString, Future[IOResult]] = FileIO.toPath(logFile(logId), Set(CREATE, WRITE, APPEND))
    def logFileSource(logId: String): Source[ByteString, Future[IOResult]] = FileIO.fromPath(logFile(logId))

    override def receive: Receive = {
        case WriteSinkRequest(userId) => 
            // obtain the source you want to offer:
            val sink = logFileSink(userId)
            // materialize the SinkRef (the remote is like a source of data for us):
            val ref: Future[SinkRef[ByteString]] = StreamRefs.sinkRef[ByteString]().to(sink).run()
            // wrap the SinkRef in some domain message, such that the sender knows what source it is
            val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
            // reply to sender
            reply.pipeTo(sender())

        case ReadSourceRequest(userId) =>
            val source = logFileSource(userId)
            val ref: Future[SourceRef[ByteString]] = source.runWith(StreamRefs.sourceRef())
            val reply: Future[ReadSourceReady] = ref.map(ReadSourceReady(userId, _))
            reply pipeTo sender()

    }
}

P.S. Is it possible to create not a "save-to-journal" sink, but flow: incoming data to write ~> save to persistence journal ~> data that was written?


Solution

  • One idea for streaming data to a persistent actor in a backpressured fashion is to use Sink.actorRefWithAck: have the actor send an acknowledgement message when it has persisted a message. This would look something like the following:

    // ...
    case class WriteSinkReady(userId: String, sinkRef: SinkRef[MyMsg])    
    // ...
    
    def receive = {
      case WriteSinkRequest(userId) =>
        val persistentActor: ActorRef = ??? // a persistent actor that handles MyMsg messages
                                            // as well as the messages used in persistentSink
    
        val persistentSink: Sink[MyMsg, NotUsed] = Sink.actorRefWithAck[MyMsg](
          persistentActor,
          /* additional parameters: see the docs */
        )
    
        val ref: Future[SinkRef[MyMsg]] = StreamRefs.sinkRef[MyMsg]().to(persistentSink).run()
        val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
        reply.pipeTo(sender())
    
      case ReadSourceRequest(userId) =>
        // ...
    }
    

    The above example uses a custom case class named MyMsg instead of ByteString.

    In the sender, assuming it's an actor:

    def receive = {
      case WriteSinkReady(userId, sinkRef) =>
        source.runWith(sinkRef) // source is a Source[MyMsg, _]
    
      // ...
    }
    

    The materialized stream in the sender will send the messages to the persistent actor.