Given two scala play enumerators A and B that each provide sorted integers, is there a way to derive an enumerator of integers that exist in B that don't exist in A?
For example:
val A: Enumerator[Int] = Enumerator(1,3,5,9,11,13)
and
val B: Enumerator[Int] = Enumerator(1,3,5,7,9,11,13)
I would somehow get:
val C: Enumerator[Int] // This enumerator will output 7
Doing it in a reactive way with enumerators/iteratees/enumeratees is preferred.
One solution I've thought of is to interleave the enumerators and somehow use Iteratee.fold to maintain a buffer to compare the two streams but that seems like it should be unnecessary.
I had somewhat similar question How to merge 2 Enumerators in one, based on merge rule I modified given answer, to fit your needs
object Disjunction {
def disjunction[E: Ordering](enumA: Enumerator[E], enumB: Enumerator[E])(implicit ec: ExecutionContext) = new Enumerator[E] {
def apply[A](iter: Iteratee[E, A]) = {
case class IterateeReturn(o: Option[(Promise[Promise[IterateeReturn]], E)])
val failP: Promise[Nothing] = Promise() // Fail promise
val failPF: Future[Nothing] = failP.future // Fail promise future
val initState1: Future[Seq[IterateeReturn]] = Future.traverse(Seq(enumA, enumB)) {
enum =>
val p: Promise[IterateeReturn] = Promise[IterateeReturn]()
// The flow to transform Enumerator in IterateeReturn form
enum.run(Iteratee.foldM(p)({
(oldP: Promise[IterateeReturn], elem: E) =>
val p = Promise[Promise[IterateeReturn]]()
// Return IterateeReturn pointing to the next foldM Future reference, and current element
oldP success IterateeReturn(Some(p, elem))
// Return new Future as a Result of foldM
p.future
}) map ({
promise => promise success IterateeReturn(None) // Finish last promise with empty IterateeReturn
})
) onFailure {
// In case of failure main flow needs to be informed
case t => failP failure t
}
p.future
}
val initState: Future[List[(Promise[Promise[IterateeReturn]], E)]] = initState1 map (_.map(_.o).flatten.toList)
val newEnum: Enumerator[Option[E]] = Enumerator.unfoldM(initState) { fstate =>
// Whatever happens first, fstate returned of failure happened during iteration
Future.firstCompletedOf(Seq(fstate, failPF)) map { state =>
// state is List[(Promise[Promise[IterateeReturn]], E)
// sort elements by E
if (state.isEmpty) {
None
} else if (state.length == 1) {
val (oldP, elem) = state.head
val p = Promise[IterateeReturn]()
oldP success p
// Return newState, with this iterator moved
val newState: Future[List[(Promise[Promise[IterateeReturn]], E)]] = p.future.map(ir => ir.o.map(List(_)).getOrElse(Nil))
Some(newState, Some(elem))
} else {
val sorted = state.sortBy(_._2)
val (firstP, fe) = sorted.head
val (secondP, se) = sorted.tail.head
if (fe != se) {
// Move first and combine with the second
val p = Promise[IterateeReturn]()
firstP success p
val newState: Future[List[(Promise[Promise[IterateeReturn]], E)]] = p.future.map(ir => ir.o.map(List(_, (secondP, se))).getOrElse(List((secondP, se))))
// Return new state
Some(newState, Some(fe))
} else {
// Move future 1
val p1 = Promise[IterateeReturn]()
firstP success p1
val fState: Future[Option[(Promise[Promise[IterateeReturn]], E)]] = p1.future.map(ir => ir.o)
// Move future 2
val p2 = Promise[IterateeReturn]()
secondP success p2
val sState: Future[Option[(Promise[Promise[IterateeReturn]], E)]] = p2.future.map(ir => ir.o)
// Combine in new state
val newState = Future.sequence(List(fState, sState)).map(_.flatten)
// Return
Some(newState , None)
}
}
}
}
newEnum &>
Enumeratee.filter(_.isDefined) &>
Enumeratee.map(_.get) apply iter
}
}
}
I checked, it works.