I'm trying to read the first 16 bytes from a akka.stream.scaladsl.Source[ByteString, Any]
and returning [Array[Byte], Source[ByteString, Any]]
.
After reading the first 16 bytes, I want to stream the remaining Source
as usual.
Use case:
The Source[ByteString, Any]
is an encrypted stream, with the first 16 bytes in the stream being the initialization vector. I need to get the initialization vector to be able to decrypt the rest of the stream.
This is what I have tried:
Source.single(ByteString("This is my test string"))
.prefixAndTail(16).runWith(Sink.head)
I would like something like this, however the prefixAndTail
takes number of elements as input. Number of elements is not number of bytes.
Please let me know if you have any suggestions. Thanks!
A couple of things to note:
I will leave some comments in the code as an explanation.
source
.via(Flow[ByteString].map(d => {
// Converts Source[ByteString] to Source[List[Byte]]
d.toByteBuffer.array().toList
}))
// Source[List[Byte]] to Source[Byte]
.mapConcat(identity)
// Get the first 16 bytes from Source[Byte] and a stream of the remaining bytes Source[(Seq[byte], Source[Byte])
.prefixAndTail(16)
// Source[(Seq[byte], Source[Byte]) to Source[Source[(Seq[Byte], Array[Byte])]]
.collect { case (key, source) =>
source.map(b => (key, Array(b)))
}
// Source[Source[(Seq[Byte], Array[Byte])]] to Source[(Seq[Byte], Array[Byte])]
.flatMapConcat(identity)
.runForeach {
case (key, rest) =>
println(s"${key.map(_.toChar).mkString} : ${rest.map(_.toChar).mkString}")
}
An example of the test containing empty ByteString:
val source = Source(Iterable[ByteString](
ByteString(""), // empty ByteString to imitate empty element from database stream
ByteString("abcdefghijklmnop <- first 16 bytes"))
)
Result expects abcdefghijklmnop
as the first 16 bytes
abcdefghijklmnop :
abcdefghijklmnop : <
abcdefghijklmnop : -
abcdefghijklmnop :
abcdefghijklmnop : f
abcdefghijklmnop : i
abcdefghijklmnop : r
abcdefghijklmnop : s
abcdefghijklmnop : t
abcdefghijklmnop :
abcdefghijklmnop : 1
abcdefghijklmnop : 6
abcdefghijklmnop :
abcdefghijklmnop : b
abcdefghijklmnop : y
abcdefghijklmnop : t
abcdefghijklmnop : e
abcdefghijklmnop : s