Search code examples
scalaplayframeworkenumeratorloops

Find difference between two enumerators with sorted entries in scala


Given two scala play enumerators A and B that each provide sorted integers, is there a way to derive an enumerator of integers that exist in B that don't exist in A?

For example:

val A: Enumerator[Int] = Enumerator(1,3,5,9,11,13)

and

val B: Enumerator[Int] = Enumerator(1,3,5,7,9,11,13)

I would somehow get:

val C: Enumerator[Int] // This enumerator will output 7

Doing it in a reactive way with enumerators/iteratees/enumeratees is preferred.

One solution I've thought of is to interleave the enumerators and somehow use Iteratee.fold to maintain a buffer to compare the two streams but that seems like it should be unnecessary.


Solution

  • I had somewhat similar question How to merge 2 Enumerators in one, based on merge rule I modified given answer, to fit your needs

    object Disjunction {
    
        def disjunction[E: Ordering](enumA: Enumerator[E], enumB: Enumerator[E])(implicit ec: ExecutionContext) = new Enumerator[E] {
    
            def apply[A](iter: Iteratee[E, A]) = {
                case class IterateeReturn(o: Option[(Promise[Promise[IterateeReturn]], E)])
    
                val failP: Promise[Nothing] = Promise() // Fail promise
                val failPF: Future[Nothing] = failP.future // Fail promise future
    
                val initState1: Future[Seq[IterateeReturn]] = Future.traverse(Seq(enumA, enumB)) {
                    enum =>
                        val p: Promise[IterateeReturn] = Promise[IterateeReturn]()
    
                        // The flow to transform Enumerator in IterateeReturn form
                        enum.run(Iteratee.foldM(p)({
                            (oldP: Promise[IterateeReturn], elem: E) =>
                                val p = Promise[Promise[IterateeReturn]]()
                                // Return IterateeReturn pointing to the next foldM Future reference, and current element
                                oldP success IterateeReturn(Some(p, elem))
                                // Return new Future as a Result of foldM
                                p.future
                            }) map ({
                                promise => promise success IterateeReturn(None) // Finish last promise with empty IterateeReturn
                            })
                        ) onFailure {
                            // In case of failure main flow needs to be informed
                            case t => failP failure t
                        }
    
                        p.future
                }
    
                val initState: Future[List[(Promise[Promise[IterateeReturn]], E)]] = initState1 map (_.map(_.o).flatten.toList)
    
                val newEnum: Enumerator[Option[E]] = Enumerator.unfoldM(initState) { fstate =>
                    // Whatever happens first, fstate returned of failure happened during iteration
                    Future.firstCompletedOf(Seq(fstate, failPF)) map { state =>
                        // state is List[(Promise[Promise[IterateeReturn]], E)
                        // sort elements by E
                        if (state.isEmpty) {
                            None
                        } else if (state.length == 1) {
                            val (oldP, elem) = state.head
                            val p = Promise[IterateeReturn]()
                            oldP success p
                            // Return newState, with this iterator moved
                            val newState: Future[List[(Promise[Promise[IterateeReturn]], E)]] = p.future.map(ir => ir.o.map(List(_)).getOrElse(Nil))
                            Some(newState, Some(elem))
                        } else {
                            val sorted = state.sortBy(_._2)
                            val (firstP, fe) = sorted.head
                            val (secondP, se) = sorted.tail.head
                            if (fe != se) {
                                // Move first and combine with the second
                                val p = Promise[IterateeReturn]()
                                firstP success p
                                val newState: Future[List[(Promise[Promise[IterateeReturn]], E)]] = p.future.map(ir => ir.o.map(List(_, (secondP, se))).getOrElse(List((secondP, se))))
                                // Return new state
                                Some(newState, Some(fe))
                            } else {
                                // Move future 1
                                val p1 = Promise[IterateeReturn]()
                                firstP success p1
                                val fState: Future[Option[(Promise[Promise[IterateeReturn]], E)]] = p1.future.map(ir => ir.o)
                                // Move future 2
                                val p2 = Promise[IterateeReturn]()
                                secondP success p2
                                val sState: Future[Option[(Promise[Promise[IterateeReturn]], E)]] = p2.future.map(ir => ir.o)
                                // Combine in new state
                                val newState = Future.sequence(List(fState, sState)).map(_.flatten)
                                // Return
                                Some(newState , None)
                            }
                        }
                    }
                }
    
                newEnum &>
                    Enumeratee.filter(_.isDefined) &>
                    Enumeratee.map(_.get) apply iter
            }
    
        }
    
    }
    

    I checked, it works.