Search code examples
scalaakkaakka-streamreactive-streams

Reading first bytes from akka.stream.scaladsl.Source


I'm trying to read the first 16 bytes from a akka.stream.scaladsl.Source[ByteString, Any] and returning [Array[Byte], Source[ByteString, Any]].

After reading the first 16 bytes, I want to stream the remaining Source as usual.

Use case:

The Source[ByteString, Any] is an encrypted stream, with the first 16 bytes in the stream being the initialization vector. I need to get the initialization vector to be able to decrypt the rest of the stream.

This is what I have tried:

Source.single(ByteString("This is my test string"))
      .prefixAndTail(16).runWith(Sink.head)

I would like something like this, however the prefixAndTail takes number of elements as input. Number of elements is not number of bytes.

Please let me know if you have any suggestions. Thanks!


Solution

  • A couple of things to note:

    1. Since input source is coming from the network, some of the ByteString may be empty
    2. We need the first 16 bytes to decrypt the rest of the stream correctly

    I will leave some comments in the code as an explanation.

    source
        .via(Flow[ByteString].map(d => {
          // Converts Source[ByteString] to Source[List[Byte]]
          d.toByteBuffer.array().toList
        }))
        // Source[List[Byte]] to Source[Byte]
        .mapConcat(identity)
        // Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte], Source[Byte])
        .prefixAndTail(16)
        // Source[(Seq[byte], Source[Byte]) to Source[Source[(Seq[Byte], Array[Byte])]]
        .collect { case (key, source) =>      
          source.map(b => (key, Array(b)))
        }
        // Source[Source[(Seq[Byte], Array[Byte])]] to Source[(Seq[Byte], Array[Byte])]
        .flatMapConcat(identity)
        .runForeach {
          case (key, rest) =>
            println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
        }
    

    An example of the test containing empty ByteString:

    val source = Source(Iterable[ByteString](
        ByteString(""), // empty ByteString to imitate empty element from database stream
        ByteString("abcdefghijklmnop <- first 16 bytes"))
      )
    

    Result expects abcdefghijklmnop as the first 16 bytes

    abcdefghijklmnop :  
    abcdefghijklmnop : <
    abcdefghijklmnop : -
    abcdefghijklmnop :  
    abcdefghijklmnop : f
    abcdefghijklmnop : i
    abcdefghijklmnop : r
    abcdefghijklmnop : s
    abcdefghijklmnop : t
    abcdefghijklmnop :  
    abcdefghijklmnop : 1
    abcdefghijklmnop : 6
    abcdefghijklmnop :  
    abcdefghijklmnop : b
    abcdefghijklmnop : y
    abcdefghijklmnop : t
    abcdefghijklmnop : e
    abcdefghijklmnop : s