Search code examples
scalaakka-stream

Convert a Source to Flow in Scala


How to convert a Source to Flow?

Input: Source[ByteString,NotUsed] a Intermediary Step: Call an API which returns an InputStream Output: Flow[ByteString,ByteString,NotUsed]

I am doing it as: Type of input is = Source[ByteString,NotUsed]

val sink: Sink[ByteString,InputStream] = StreamConverters.asInputStream() 
val output: InputStream = <API CALL>    
val mySource: Source[ByteString,Future[IOResult]] = StreamConverters.fromInputStream(() => output)
val myFlow: Flow[ByteString,ByteString,NotUsed] = Flow.fromSinkAndSource(sink,source)

When I use the above Flow in the source it returns an empty result. Can someone help me figure out of I am doing it right?


Solution

  • I'm not sure tu fully grasp what you want to achieve but maybe this is a use case for flatMapConcat:

    def readInputstream(bs: ByteString): Source[ByteString, Future[IOResult]] =
      // Get some IS from the ByteString
      StreamConverters.fromInputStream(() => ???)
    
    val myFlow: Flow[ByteString, ByteString, NotUsed] = 
      Flow.flatMapConcat(bs => readInputstream(bs))
    
    // And use it like this:
    val source: Source[ByteString] = ???
    source
      .via(myFlow)
      .to(???)