Search code examples
scalaioakkaavroakka-stream

How to read binary avro fileData, with Source in akka?


I'm trying to read an avro file with Source from akka Streams.

Source in akka streams read data like FileIO.FromPath(File), which will read and separate the lines based on (\n) character, where as for avro how does it work?

Flow :

    object AvroFlow  {
    def apply(jobDate: String): Flow[GenericRecord, GenericRecord, NotUsed] = {
            Flow[GenericRecord].map { rec => rec.put("date", "20190812") rec}       
    }
  }

Graph:

object AvroRunner {
    def build (src: Source[GenericRecord, NotUsed],
                                     flw: Flow[GenericRecord, GenericRecord, NotUsed],
                                     snk:Flow[GenericRecord, Future[Done])
    : AvroRunner = {
      new AvroRunner(srtc,flw,snk)
    }
  }
class AvroRunner private(src: Source[GenericRecord, NotUsed],
                                     flw: Flow[GenericRecord, GenericRecord, NotUsed],
                                     snk:Flow[GenericRecord, Future[Done]){
  import scala.concurrent.ExecutionContext.Implicits.global
  val GraphRunner = RunnableGraph.fromGraph(GraphDSL.create() {implicit builder =>
    import GraphDSL.Implicits._
    src ~> flw ~> snk
    ClosedShape
  })
}

Solution

  • The easiest way to create an akka Source of avro data objects wouldn't be from the raw binary file itself. Rather, create the Source from the DataFileReader provided by the avro library.

    From the documentation we first create the file reader from a java.io.File generator:

    def createFileReader[T : ClassTag](fileGenerator : () => File) : DataFileReader[T] = 
      new DataFileReader[T](file(), new SpecificDatumReader[T](classTag[T].runtimeClass))
    

    This can then be used to create a scala Iterator:

    def dataFileReaderToIterator[T](dataFileReader : DataFileReader[T]) : Iterator[T] = 
      new Iterator[T] {
        override def hasNext : Boolean = dataFileReader.hasNext
    
        override def next() : T = dataFileReader.next
      }
    

    We can now construct a stream Source from the file generator:

    def fileToAvroSource[T](fileGenerator : () => File) : Source[T, _] = 
      Source.fromIterator[T](() => dataFileReaderToIterator[T](createFileReader(fileGenerator)))
    

    Backpressure?

    It appears that avro is using standard BufferedReader/OutputStream techniques to read the File. Therefore, the above implementation should provide backpressure all the way to the source of the File. However, I have not confirmed this to be the case...