I have a code which calls couchbase for getting some rows as following:
val gotValues: Observable[JsonDocument] = Observable.from(rowKeys).flatMap(id =>
If I have 1,2,3,4,5,6 as input row-keys and only rows 1,2,3 exist in the DB then the observable would get notified only about 1,2,3.
My requirement however is that I return a map with 1,2,3 true (exist in db) and 4,5,6 with false, (meaning not exist in DB). I managed to do that with scala observable however i'm using an intermediate map data structure to return the total map which contains all ids. Below is a sample code which simulates my problem..
object Main extends App {
import rx.lang.scala.Observable
val idsToFetch = Seq(1,2,3,4,5,6)
println(isInDBOrNot()) // {1=true, 2=true, 3=true, 4=false, 5=false, 6=false}
private def isInDBOrNot(): ConcurrentHashMap[Int, Boolean] = {
val inAndNotInDB = new java.util.concurrent.ConcurrentHashMap[Int, Boolean]
// - How can I avoid the additional data structure?
// - In this case a map, so that the function will return
// a map with all numbers and for each if exist in DB?
// - I mean I want the function to return a map I don't
// want to populate that map inside the observer,
// it's like a mini side effect I would rather simply
// manipulate the stream.
.filterNot(x => x == 4 || x == 5 || x == 6) // Simulate fetch from DB, 4,5,6 do not exist in DB, so not returned.
x => inAndNotInDB.put(x, true),
e => println(e),
() => idsToFetch.filterNot(inAndNotInDB.containsKey)
.foreach(inAndNotInDB.put(_, false)) // mark all non-found as false.
Anyway to do that without the intermediate map (without populating an intermedia data structure, but only by manipulating the stream)? It does not look clean!! . Thanks.
You problem seems to arise from the fact that you use flatMap
so if there is no data in the DB for a given id
and you get an empty Observable
, flatMap
just produces no output for such id
. So it looks like what you need is defaultIfEmpty which is translated to Scala's orElse
. You can use orElse
to return some default value inside flatMap
. So to modify your example:
def fetchFromDb(id: Int): Observable[String] = {
if (id <= 3)
Observable.just(s"Document #$id")
def gotValue(idsToFetch: Seq[Int]): Observable[(Int, Boolean)] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(_ => (id, true)).orElse((id, false)))
println(gotValue(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
which prints
List((1,true), (2,true), (3,true), (4,false), (5,false), (6,false))
Or you can use Option
to return Some(JsonDocument)
or None
such as
def gotValueEx(idsToFetch: Seq[Int]): Observable[(Int, Option[String])] = {
Observable.from(idsToFetch).flatMap((id: Int) => fetchFromDb(id).map(doc => (id, Option(doc))).orElse((id, None)))
println(gotValueEx(Seq(1, 2, 3, 4, 5, 6)).toBlocking.toList)
which prints
List((1,Some(Document #1)), (2,Some(Document #2)), (3,Some(Document #3)), (4,None), (5,None), (6,None))