I'm new to Akka Streams and I'm wondering how to implement some kind of mid-stream validation. Example:
FileIO
.fromPath(file)
.via(Framing.delimiter(...)
.map(_.utf8String)
.map(_.split("\t", -1))
.validate(arr => arr.length == 10) // or similar
...
I assumed that this scenario is so common that there must be a predefined functionality for validating a stream on the fly. However, I wasn't able to find anything about it. Am I on the wrong tracks here and validation is something that should not be done this way in Akka Streams?
In my particular scenario, I'm processing a file line by line. If only one single line is invalid, it does not make sense to continue and the processing should be aborted.
I'd probably create a type to represent the constraints, then you can do the assertions when creating instances of that type, as well as know downstream which constraints have been applied.
Example:
object LineItem {
// Makes it possible to provide the validation before allocating the item
def apply(string: String): LineItem = {
require(string.length == 10)
new LineItem(string) // Call the companion-accessible constructor
}
}
// private[LineItem] makes sure that `new` only works from companion object
final case class LineItem private[LineItem](string: String)