Search code examples
scalaakka-stream

Implementing custom Akka Streams Source based on ActorPublisher


I would like to implement custom Source[ByteSting] in Akka Stream. This source should just read data from supplied file and within supplied bytes range and propagate it downstream.

At first I thought, that this can be done by implementing Actor which mixes in ActorPublisher. This implementation is analogous to akka.stream.impl.io.FilePublisher which reads the whole file from supplied path instead of just the data from given bytes range:

import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Path, StandardOpenOption}

import akka.actor.{ActorLogging, DeadLetterSuppression, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString

import scala.annotation.tailrec
import scala.util.control.NonFatal

class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString]
  with ActorLogging{

  import FilePublisher._

  private val chunksToBuffer = 10
  private var bytesLeftToRead = endByte - startByte + 1
  private var fileChannel: FileChannel = _
  private val buffer = ByteBuffer.allocate(8096)

  private var bufferedChunks: Vector[ByteString] = _

  override def preStart(): Unit = {
    try {
      log.info("Starting")
      fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ)
      bufferedChunks = readAhead(Vector.empty, Some(startByte))
      log.info("Chunks {}", bufferedChunks)
    } catch {
      case NonFatal(ex) => onErrorThenStop(ex)
    }
  }

  override def postStop(): Unit = {

    log.info("Stopping")
    if (fileChannel ne null)
      try fileChannel.close() catch {
        case NonFatal(ex) => log.error(ex, "Error during file channel close")
    }
  }

  override def receive: Receive = {
    case Request =>
      readAndSignalNext()
      log.info("Got request")
    case Continue =>
      log.info("Continuing reading")
      readAndSignalNext()
    case Cancel =>
      log.info("Cancel message got")
      context.stop(self)
  }

  private def readAndSignalNext() = {

    log.info("Reading and signaling")
    if (isActive) {
      bufferedChunks = readAhead(signalOnNext(bufferedChunks), None)
      if (isActive && totalDemand > 0) self ! Continue
    }
  }

  @tailrec
  private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = {

    if (chunks.nonEmpty && totalDemand > 0) {
      log.info("Signaling")
      onNext(chunks.head)
      signalOnNext(chunks.tail)
    } else {
      if (chunks.isEmpty && bytesLeftToRead > 0) {
        onCompleteThenStop()
      }
      chunks
    }
  }

  @tailrec
  private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = {

    if (currentlyBufferedChunks.size < chunksToBuffer) {

      val bytesRead = readDataFromChannel(startPosition)
      log.info("Bytes read {}", bytesRead)
      bytesRead match {
        case Int.MinValue => Vector.empty
        case -1 =>
          log.info("EOF reached")
          currentlyBufferedChunks // EOF reached
        case _ =>
          buffer.flip()
          val chunk = ByteString(buffer)
          buffer.clear()

          bytesLeftToRead -= bytesRead
          val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt)
          readAhead(currentlyBufferedChunks :+ trimmedChunk, None)
      }

    } else {
      currentlyBufferedChunks
    }
  }

  private def readDataFromChannel(startPosition: Option[Long]): Int = {
    try {
      startPosition match {
        case Some(position) => fileChannel.read(buffer, position)
        case None => fileChannel.read(buffer)
      }
    } catch {
      case NonFatal(ex) =>
        log.error(ex, "Got error reading data from file channel")
        Int.MinValue
    }
  }
}

object FilePublisher {

  private case object Continue extends DeadLetterSuppression

  def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte)
}

But turns out that when I materialize Source backed by my FilePublisher like this:

val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength))
val future = fileSource.runWith(Sink.seq) 

nothing happens and the source doesn't propagate data further downstream.

Is there any other correct way to materialize Source based on my FilePublisher or should I not use this API and just implement custom processing stage like described here ?

The problem with CustomStage approach is that its trivial implementation will perform IO straight away in this stage. I guess, I could move IO from stage to custom thread pool or actor, but this would require some form of synchronization between stage and the actor. Thanks.


Solution

  • The problem was caused by mistake in pattern matching of receive method: This line case Request => should be instead case Request(_) because Request is actually case class with single parameter (final case class Request(n: Long))and not case object as I thought.