Search code examples
scalaakkaakka-stream

Akka Streams: validation of elements being streamed


I'm new to Akka Streams and I'm wondering how to implement some kind of mid-stream validation. Example:

FileIO
  .fromPath(file)
  .via(Framing.delimiter(...)
  .map(_.utf8String)
  .map(_.split("\t", -1))
  .validate(arr => arr.length == 10) // or similar
  ...

I assumed that this scenario is so common that there must be a predefined functionality for validating a stream on the fly. However, I wasn't able to find anything about it. Am I on the wrong tracks here and validation is something that should not be done this way in Akka Streams?

In my particular scenario, I'm processing a file line by line. If only one single line is invalid, it does not make sense to continue and the processing should be aborted.


Solution

  • I'd probably create a type to represent the constraints, then you can do the assertions when creating instances of that type, as well as know downstream which constraints have been applied.

    Example:

    object LineItem {
      // Makes it possible to provide the validation before allocating the item
      def apply(string: String): LineItem = {
        require(string.length == 10)
        new LineItem(string) // Call the companion-accessible constructor
      }
    }
    // private[LineItem] makes sure that `new` only works from companion object
    final case class LineItem private[LineItem](string: String)