Search code examples

Akka flow Input (`In`) as Output (`Out`)

I am trying to write a piece of code which does following:-

  1. Reads a large csv file from remote source like s3.
  2. Process the file record by record.
  3. Send notification to user
  4. Write the output to a remote location

Sample record in input csv:


My input case class which represents a record in input csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Sample record in output csv (that needs to be written):

2,Tom,Web Developer

My output case class which represents a record in input csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Reading a record using akka stream csv (uses Alpakka reactive s3

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =, s3Object.path)
// This is then converted to csv

Now I have a function to process the records:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Function to write the OutputRecord as csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
                       metaHeaders = MetaHeaders(Map())

Function to send email notification:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Stitching it all together

readAsCSV.flatMap { recordSource => { record
    val outputRecord = process(record)
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16

On Line 15 & 16 I am getting an error, I am either able to add Line 15 or Line 16 but not both since both notify & writeOutput needs outputRecord. Once notify is called I loose my outputRecord.

Is there a way I can add both notify and writeOutput to same graph?

I am not looking for parallel execution as I want to first call notify and then only writeOutput. So this is not helpful:

The use case seems very simple to me but some how I am not able to find a clean solution.


  • The output of notify is a PushResult, but the input of writeOutput is ByteString. Once you change that it will compile. In case you need ByteString, get the same from OutputRecord.

    BTW, in the sample code that you have provided, a similar error exists in readCSV and process.