Search code examples
scalarx-scalacouchbase-java-api

scala observable unify observable with a sequence without intermediate datastructure update


I have a code which calls couchbase for getting some rows as following:

val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
      couchbaseBucket.async().get(id))

If I have 1,2,3,4,5,6 as input row-keys and only rows 1,2,3 exist in the DB then the observable would get notified only about 1,2,3.

My requirement however is that I return a map with 1,2,3 true (exist in db) and 4,5,6 with false, (meaning not exist in DB). I managed to do that with scala observable however i'm using an intermediate map data structure to return the total map which contains all ids. Below is a sample code which simulates my problem..

object Main extends App {
  import rx.lang.scala.Observable

  val idsToFetch = Seq(1,2,3,4,5,6)

  println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}

  private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
    val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
    // - How can I avoid the additional data structure?
    // - In this case a map, so that the function will return
    //   a map with all numbers and for each if exist in DB?
    // - I mean I want the function to return a map I don't 
    //   want to populate that map inside the observer,
    //   it's like a mini side effect I would rather simply 
    //   manipulate the stream.

    Observable.from(idsToFetch)
      .filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
      .subscribe(
      x => inAndNotInDB.put(x, true),
      e => println(e),
      () => idsToFetch.filterNot(inAndNotInDB.containsKey)
        .foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
    )

    inAndNotInDB
  }

}

Anyway to do that without the intermediate map (without populating an intermedia data structure, but only by manipulating the stream)? It does not look clean!! . Thanks.


Solution

  • You problem seems to arise from the fact that you use flatMap so if there is no data in the DB for a given id and you get an empty Observable, flatMap just produces no output for such id. So it looks like what you need is defaultIfEmpty which is translated to Scala's orElse. You can use orElse to return some default value inside flatMap. So to modify your example:

    def fetchFromDb(id: Int): Observable[String] = {
      if (id <= 3)
        Observable.just(s"Document #$id")
      else
        Observable.empty
    }
    
    def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
      Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
    }
    
    println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
    

    which prints

    List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))

    Or you can use Option to return Some(JsonDocument) or None such as

    def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
      Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
    }
    
    println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
    

    which prints

    List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,None))