Search code examples
scalaakka-streamakka-http

Convert Source[ByteString, Any] to Source[ByteString, IOResult]


I have a:

val fileStream: Source[ByteString, Any] = Source.single(ByteString.fromString("Hello"))

This Source[ByteString, Any] type comes from akka fileUpload directive:

https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/file-upload-directives/fileUpload.html

Can I convert this to a Source[ByteString, IOResult], or alternatively, perform some other operation similar to Source.single(ByteString.fromString("Hello")) that would return me a Source[ByteString, IOResult] from a string?

I can create an IO result with:

val ioResult: IOResult = IOResult.createSuccessful(1L)

and a ByteString with:

val byteString: ByteString = ByteString.fromString("Hello")

so now I just need them as a Source[ByteString, IOResult]

Note, this is just for a unit test, I'm testing a function that returns a Source[ByteString, IOResult], and so I'd like to create an instance of this (without having to create a file) to assert the function is returning the correct ByteString, I don't really care about the IOResult part of the Source.


Solution

  • Specific to the fileUpload Directive

    I suspect the fileUpload writers kept the materialization type as Any to allow for future changes to the API. By keeping it an Any they can later change it to the type that they truly want to settle on.

    Therefore, even if you are able to cast to an IOResult you may bump into issues down the road if you upgrade the akka version and the type has changed...

    Materialization Generally

    The second type in the Source type parameters indicates what the stream will "materialize" into. Using your example code, and modify the Any to the actual type NotUsed, we can show the whole life-cycle of the stream:

    val notUsed : NotUsed = Source.single(ByteString.fromString("Hello"))
                                  .toMat(Sink.ignore)(Keep.left)
                                  .run()
    

    As you can see, when the stream is run it is turned into an actual value (i.e. materialized). In the above case the type of the value is NotUsed. This is because there isn't much you can do to a stream that sources a single value.

    Contrast that stream with a stream that operates on a file:

    val file = Paths.get("example.csv")
    
    val fileIOResult: Future[IOResult] = FileIO.fromPath(file)
                                               .to(Sink.ignore)
                                               .run()
    

    In this case the stream is reading the contents of the file and streaming it to the Sink. Here it would be useful to know if there were any errors from the file reading. To be able to find out how well the file reading went the stream is materialized into a Future[IOResult] which you can use to get information about the file reading:

    fileIOResult foreach { ioResult =>
      println(s"read ${ioResult.count} bytes from file")
    }
    

    Therefore, it doesn't really make sense to "convert" an Any into an IOResult...