Search code examples
scalascalazscalaz-stream

Scalaz stream group sorted database results


I see a common pattern in my code. I have sorted results from a database and I need to emit them in a nested structure. I would like this to stream and so I want to have as few records in memory at a time. Using TravesableLike.groupBy assumes that the data is not sorted and so it needlessly fills a mutable map. I would like to keep this truly streaming. Is scalaz-stream useful here?

val sql = """select grandparent_id, parent_id, child_id
  from children
  where grandparent_id = ?
  order by grandparent_id, parent_id, child_id"""

def elementsR[P, R](invoker: Invoker[P, R], param: P): Process[Task, R] =
  // Invoker.elements returns trait CloseableIterator[+T] extends Iterator[T] with Closeable
  resource(Task.delay(invoker.elements(param)))(
    src => Task.delay(src.close)) { src =>
      Task.delay { if (src.hasNext) src.next else throw End }
  }

def dbWookie {
  // grandparent_id, (grandparent_id, parent_id, child_id)
  val invoker = Q.query[Int, (Int, Int, Int)](sql)
  val es = elementsR(invoker, 42)

  // ?, ?, ?

  // nested emits (42, ((35, (1, 3, 7)), (36, (8, 9, 12))))
}

I don't see too many functions like foldLeft and scanLeft on Process so I am not sure how to detect when grandparent_id, parent_id or child_id changes and emit a group. Any ideas?


Solution

  • I think you want something that works in a similar way to chunkBy. chunkBy emits a chunk whenever the result of a predicate function flips from true to false.

    You could generalise this from comparing boolean values, to comparing the result of some arbitrary function of the input. Thus, you would have a process that emits a chunk whenever the value of this function applied to the input changes:

    def chunkOn[I, A](f: I => A): Process1[I, Vector[I]] = {
      def go(acc: Vector[I], last: A): Process1[I,Vector[I]] =
        await1[I].flatMap { i =>
          val cur = f(i)
          if (cur != last) emit(acc) then go(Vector(i), cur)
          else go(acc :+ i, cur)
        } orElse emit(acc)
      await1[I].flatMap(i => go(Vector(i), f(i)))
    }
    

    A quick dirty test in the REPL, using the Identity monad to force evaluation straight away:

    scala> import scalaz.stream._, scalaz.Id._
    import scalaz.stream._
    import scalaz.Id._
    
    scala> val rows = Seq(('a, 'b, 'c), ('a, 'b, 'd), ('b, 'a, 'c), ('b, 'd, 'a))
    rows: Seq[(Symbol, Symbol, Symbol)] = List(('a,'b,'c), ('a,'b,'d), ('b,'a,'c), ('b,'d,'a))
    
    scala> val process = Process.emitSeq[Id, (Symbol, Symbol, Symbol)](rows)
    process: scalaz.stream.Process[scalaz.Id.Id,(Symbol, Symbol, Symbol)] =
      Emit(List(('a,'b,'c), ('a,'b,'d), ('b,'a,'c), ('b,'d,'a)),Halt(scalaz.stream.Process$End$))
    
    scala> process |> chunkOn(_._1)
    res4: scalaz.stream.Process[scalaz.Id.Id,scala.collection.immutable.Vector[(Symbol, Symbol, Symbol)]] =
      Emit(List(Vector(('a,'b,'c), ('a,'b,'d))),Emit(List(Vector(('b,'a,'c), ('b,'d,'a))),Halt(scalaz.stream.Process$End$)))
    

    As you suggested, chunkWhen uses a predicate over the current and last values, and emits a chunk when it evaluates to false.

    def chunkWhen[I](f: (I, I) => Boolean): Process1[I, Vector[I]] = {
      def go(acc: Vector[I]): Process1[I,Vector[I]] =
        await1[I].flatMap { i =>
          acc.lastOption match {
            case Some(last) if ! f(last, i) => emit(acc) then go(Vector(i))
            case _ => go(acc :+ i)
          }
        } orElse emit(acc)
      go(Vector())
    }
    

    Trying it out:

    scala> process |> chunkWhen(_._1 == _._1)
    res0: scalaz.stream.Process[scalaz.Id.Id,Vector[(Symbol, Symbol, Symbol)]] =
      Emit(List(Vector(('a,'b,'c), ('a,'b,'d))),Emit(List(Vector(('b,'a,'c), ('b,'d,'a))),Halt(scalaz.stream.Process$End$)))