Search code examples
akka-stream

How to buffer and drop a chunked bytestring with a delimiter?


Lets say you have a publisher using broadcast with some fast and some slow subscribers and would like to be able to drop sets of messages for the slow subscriber without having to keep them in memory. The data consists of chunked ByteStrings, so dropping a single ByteString is not an option.

Each set of ByteStrings is followed by a terminator ByteString("\n"), so I would need to drop a set of ByteStrings ending with that.

Is that something you can do with a custom graph stage? Can it be done without aggregating and keeping the whole set in memory?


Solution

  • Avoid Custom Stages

    Whenever possible try to avoid custom stages, they are very tricky to get correct as well as being pretty verbose. Usually some combination of the standard akka-stream stages and plain-old-functions will do the trick.

    Group Dropping

    Presumably you have some criteria that you will use to decide which group of messages will be dropped:

    type ShouldDropTester : () => Boolean
    

    For demonstration purposes I will use a simple switch that drops every other group:

    val dropEveryOther : ShouldDropTester = 
      Iterator.from(1)
              .map(_ % 2 == 0)
              .next
    

    We will also need a function that will take in a ShouldDropTester and use it to determine whether an individual ByteString should be dropped:

    val endOfFile = ByteString("\n")
    
    val dropGroupPredicate : ShouldDropTester => ByteString => Boolean = 
      (shouldDropTester) => {
        var dropGroup = shouldDropTester()
    
        (byteString) => 
          if(byteString equals endOfFile) {
            val returnValue = dropGroup
            dropGroup = shouldDropTester()
            returnValue
          }
          else {
            dropGroup
          }      
      }
    

    Combining the above two functions will drop every other group of ByteStrings. This functionality can then be converted into a Flow:

    val filterPredicateFunction : ByteString => Boolean =
      dropGroupPredicate(dropEveryOther)
    
    val dropGroups : Flow[ByteString, ByteString, _] =
      Flow[ByteString] filter filterPredicateFunction
    

    As required: the group of messages do not need to be buffered, the predicate will work on individual ByteStrings and therefore consumes a constant amount of memory regardless of file size.