I would like to implement custom Source[ByteSting]
in Akka Stream. This source should just read data from supplied file and within supplied bytes range and propagate it downstream.
At first I thought, that this can be done by implementing Actor which mixes in ActorPublisher. This implementation is analogous to akka.stream.impl.io.FilePublisher
which reads the whole file from supplied path instead of just the data from given bytes range:
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Path, StandardOpenOption}
import akka.actor.{ActorLogging, DeadLetterSuppression, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.NonFatal
class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString]
with ActorLogging{
import FilePublisher._
private val chunksToBuffer = 10
private var bytesLeftToRead = endByte - startByte + 1
private var fileChannel: FileChannel = _
private val buffer = ByteBuffer.allocate(8096)
private var bufferedChunks: Vector[ByteString] = _
override def preStart(): Unit = {
try {
log.info("Starting")
fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ)
bufferedChunks = readAhead(Vector.empty, Some(startByte))
log.info("Chunks {}", bufferedChunks)
} catch {
case NonFatal(ex) => onErrorThenStop(ex)
}
}
override def postStop(): Unit = {
log.info("Stopping")
if (fileChannel ne null)
try fileChannel.close() catch {
case NonFatal(ex) => log.error(ex, "Error during file channel close")
}
}
override def receive: Receive = {
case Request =>
readAndSignalNext()
log.info("Got request")
case Continue =>
log.info("Continuing reading")
readAndSignalNext()
case Cancel =>
log.info("Cancel message got")
context.stop(self)
}
private def readAndSignalNext() = {
log.info("Reading and signaling")
if (isActive) {
bufferedChunks = readAhead(signalOnNext(bufferedChunks), None)
if (isActive && totalDemand > 0) self ! Continue
}
}
@tailrec
private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = {
if (chunks.nonEmpty && totalDemand > 0) {
log.info("Signaling")
onNext(chunks.head)
signalOnNext(chunks.tail)
} else {
if (chunks.isEmpty && bytesLeftToRead > 0) {
onCompleteThenStop()
}
chunks
}
}
@tailrec
private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = {
if (currentlyBufferedChunks.size < chunksToBuffer) {
val bytesRead = readDataFromChannel(startPosition)
log.info("Bytes read {}", bytesRead)
bytesRead match {
case Int.MinValue => Vector.empty
case -1 =>
log.info("EOF reached")
currentlyBufferedChunks // EOF reached
case _ =>
buffer.flip()
val chunk = ByteString(buffer)
buffer.clear()
bytesLeftToRead -= bytesRead
val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt)
readAhead(currentlyBufferedChunks :+ trimmedChunk, None)
}
} else {
currentlyBufferedChunks
}
}
private def readDataFromChannel(startPosition: Option[Long]): Int = {
try {
startPosition match {
case Some(position) => fileChannel.read(buffer, position)
case None => fileChannel.read(buffer)
}
} catch {
case NonFatal(ex) =>
log.error(ex, "Got error reading data from file channel")
Int.MinValue
}
}
}
object FilePublisher {
private case object Continue extends DeadLetterSuppression
def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte)
}
But turns out that when I materialize Source
backed by my FilePublisher
like this:
val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength))
val future = fileSource.runWith(Sink.seq)
nothing happens and the source doesn't propagate data further downstream.
Is there any other correct way to materialize Source
based on my FilePublisher
or should I not use this API and just implement custom processing stage like described here ?
The problem with CustomStage approach is that its trivial implementation will perform IO straight away in this stage. I guess, I could move IO from stage to custom thread pool or actor, but this would require some form of synchronization between stage and the actor. Thanks.
The problem was caused by mistake in pattern matching of receive
method:
This line case Request =>
should be instead case Request(_)
because Request
is actually case class with single parameter (final case class Request(n: Long)
)and not case object as I thought.