Using scala 2.11.7, rxscala_2.11 0.25.0, rxjava 1.0.16, my oddFutures
callbacks don't get called in AsyncDisjointedChunkMultiprocessing.process()
package jj.async
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import rx.lang.scala.Observable
import jj.async.helpers._
/* Problem: How to multi-process records asynchronously in chunks.
Processing steps:
- fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations)
- process ea. chunk through a filter asynchronously (has 10-record input limit)
- compute the reverse of the filtered result
- enrich (also has 10-record input limit) filtered results asynchronously
- return enriched filtered results once all records are processed
object AsyncDisjointedChunkMultiprocessing {
private implicit val ec =
def process(): List[Enriched] = {
@volatile var oddsBuffer = Set[Int]()
@volatile var enrichedFutures = Observable just Set[Enriched]()
odds =>
if (odds.size + oddsBuffer.size >= chunkSize) {
val chunkReach = chunkSize - oddsBuffer.size
val poors = oddsBuffer ++ odds take chunkReach
enrichedFutures = enrichedFutures + poors
oddsBuffer = odds drop chunkReach
} else {
oddsBuffer ++= odds
error => throw error,
() => enrichedFutures + oddsBuffer)
private def oddFutures: Observable[Set[Int]] =
Repository.query(chunkSize) { chunk =>
evenFuture(chunk) map {
filtered => chunk -- filtered
private def evenFuture(chunk: Set[Int]): Future[Set[Int]] = {
Future { Remote even chunk }
class Enriched(i: Int)
object Enriched {
def apply(i: Int) = new Enriched(i)
def enrich(poors: Set[Int]): Set[Enriched] = {
poors map { Enriched(_) }
object Repository {
def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Set[Int]] = {
implicit val ec =
Observable.from {
f(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
f(Set(11, 12, 13, 14, 15, 16, 17, 18, 19, 20))
f(Set(21, 22, 23, 24, 25))
package object helpers {
val chunkSize = 10
implicit class EnrichedObservable(enrichedObs: Observable[Set[Enriched]]) {
def +(poors: Set[Int]): Observable[Set[Enriched]] = {
enrichedObs merge Observable.just {
def checkSizeLimit(set: Set[_ <: Any]) =
if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}")
// unmodifiable
object Remote {
def even = { xs: Set[Int] =>
xs filter { _ % 2 == 0 }
Is there something wrong w/ the way I'm creating my Observable.from(Future)
in Repository.query()
The problem is that I am trying to create an observable from multiple futures but Observable.from(Future) only provides for a singular future (the compiler did not complain because I carelessly omitted the separating commas thereby usurping an unsuspecting overload). My sol'n:
object Repository {
def query(f: Set[Int] => Future[Set[Int]])(fetchSize: Int = 10): Observable[Future[Set[Int]]] =
// observable (as opposed to list) because modeling a process
// where the total result size is unknown beforehand.
// Also, not creating or applying because it blocks the futures
(1 to 21 by fetchSize).foldLeft(Observable just Future((Set[Int]()))) { (obs, i) =>
obs + f(DataSource.fetch(i)())
object DataSource {
def fetch(begin: Int)(fetchSize: Int = 10) = {
val end = begin + fetchSize
(for {
i <- begin until end
} yield i).toSet
implicit class FutureObservable(obs: Observable[Future[Set[Int]]]) {
def +(future: Future[Set[Int]]) =
obs merge (Observable just future)