Search code examples
reactive-programmingrxjs5

RxJS5 groupBy dropping the first item from first group?


I'm having difficulties to see why this code is dropping the first element?

const Rx = require("@reactivex/rxjs")

const requestQueue = Rx.Observable.from([
  {type: "show.json", value: "1"},
  {type: "tweets.json", value: "A"},
  {type: "show.json", value: "2"},
  {type: "tweets.json", value: "B"}
])

requestQueue.groupBy(request => request.type).bufferCount(2).mergeMap(requestSubstreams => {
  return requestSubstreams[0].merge(requestSubstreams[1])
}).subscribe(x => console.log(x))

The output I'm getting is:

{ type: 'tweets.json', value: 'A' }
{ type: 'show.json', value: '2' }
{ type: 'tweets.json', value: 'B' }

Been doing some experiments, it's always the first element that gets dropped.

This is the link to RxFiddle: http://rxfiddle.net/#type=editor&code=Y29uc3QgcmVxdWVzdFF1ZXVlID0gUnguT2JzZXJ2YWJsZS5mcm9tKFsKICB7dHlwZTogInNob3cuanNvbiIsIHZhbHVlOiAiMSJ9LAogIHt0eXBlOiAidHdlZXRzLmpzb24iLCB2YWx1ZTogIkEifSwKICB7dHlwZTogInNob3cuanNvbiIsIHZhbHVlOiAiMiJ9LAogIHt0eXBlOiAidHdlZXRzLmpzb24iLCB2YWx1ZTogIkIifQpdKQoKcmVxdWVzdFF1ZXVlLmdyb3VwQnkocmVxdWVzdCA9PiByZXF1ZXN0LnR5cGUpLmJ1ZmZlckNvdW50KDIpLm1lcmdlTWFwKHJlcXVlc3RTdWJzdHJlYW1zID0+IHsKICByZXR1cm4gcmVxdWVzdFN1YnN0cmVhbXNbMF0ubWVyZ2UocmVxdWVzdFN1YnN0cmVhbXNbMV0pCn0pLnN1YnNjcmliZSh4ID0+IGNvbnNvbGUubG9nKHgpKQ== ... but I haven't figured out how to interpret it.

I have to resort to this code (using partition) in order to get the result I was expecting:

const requestQueue = Rx.Observable.from([
  {type: "show.json", value: "1"},
  {type: "tweets.json", value: "A"},
  {type: "show.json", value: "2"},
  {type: "tweets.json", value: "B"}
])

const requestSubstreams = requestQueue.partition(request => request.type == "show.json")
const showJsonSubstream = requestSubstreams[0]
const tweetsJsonSubstream = requestSubstreams[1]

showJsonSubstream.merge(tweetsJsonSubstream).subscribe(x => console.log(x))

Can you help point out what's the issue / caveat with groupBy?


Solution

  • To @cartant : it has to do with this code (in this code) the letters and numbers are from separate observables.

    const letterArr = ["A", "B", "C", "D", "E", "F", "G", "H", "I"]
    const letterStream = Rx.Observable.zip(
      Rx.Observable.from(letterArr),
      Rx.Observable.range(0, letterArr.length - 1), 
      (letter, index) => Rx.Observable.of({letter, index}).delay(
        index == 0 ? 0 : index < 5 ? (index == 2 ? 2000 : 5000) : 20000000
      )
    ).concatAll().multicast(new Rx.Subject())
    //A emitted at 0s, B at 5s, C at 7s, D at 12s, E at 17s, F at ... never
    
    const numberStream = Rx.Observable.zip(
      Rx.Observable.range(1, 20),
      Rx.Observable.interval(2000).startWith(-1).map(x => x +1),
      (number, index) => ({number, index})
    ).multicast(new Rx.Subject())
    //1 emitted at 0s, 2 at 2s, 3 at 4s, ...
    
    const slowedDownNumberStream = Rx.Observable.zip(
      numberStream,
      Rx.Observable.zip(
        letterStream,
        Rx.Observable.interval(10000).startWith(-1).map(x => x +1)
      ).timeoutWith(11000, Rx.Observable.interval(2000).startWith(-1)),
      (number, index) => ({number, index})
    )
    
    Rx.Observable.merge(letterStream, slowedDownNumberStream).subscribe(x => console.log(new Date(), " > ", x))
    letterStream.connect()
    numberStream.connect()
    

    RxFiddle: http://rxfiddle.net/#type=editor&code=Y29uc3QgbGV0dGVyQXJyID0gWyJBIiwgIkIiLCAiQyIsICJEIiwgIkUiLCAiRiIsICJHIiwgIkgiLCAiSSJdCmNvbnN0IGxldHRlclN0cmVhbSA9IFJ4Lk9ic2VydmFibGUuemlwKAogIFJ4Lk9ic2VydmFibGUuZnJvbShsZXR0ZXJBcnIpLAogIFJ4Lk9ic2VydmFibGUucmFuZ2UoMCwgbGV0dGVyQXJyLmxlbmd0aCAtIDEpLCAKICAobGV0dGVyLCBpbmRleCkgPT4gUnguT2JzZXJ2YWJsZS5vZih7bGV0dGVyLCBpbmRleH0pLmRlbGF5KAogICAgaW5kZXggPT0gMCA/IDAgOiBpbmRleCA8IDUgPyAoaW5kZXggPT0gMiA/IDIwMDAgOiA1MDAwKSA6IDIwMDAwMDAwCiAgKQopLmNvbmNhdEFsbCgpLm11bHRpY2FzdChuZXcgUnguU3ViamVjdCgpKQovL0EgZW1pdHRlZCBhdCAwcywgQiBhdCA1cywgQyBhdCA3cywgRCBhdCAxMnMsIEUgYXQgMTdzLCBGIGF0IC4uLiBuZXZlcgoKY29uc3QgbnVtYmVyU3RyZWFtID0gUnguT2JzZXJ2YWJsZS56aXAoCiAgUnguT2JzZXJ2YWJsZS5yYW5nZSgxLCAyMCksCiAgUnguT2JzZXJ2YWJsZS5pbnRlcnZhbCgyMDAwKS5zdGFydFdpdGgoLTEpLm1hcCh4ID0+IHggKzEpLAogIChudW1iZXIsIGluZGV4KSA9PiAoe251bWJlciwgaW5kZXh9KQopLm11bHRpY2FzdChuZXcgUnguU3ViamVjdCgpKQovLzEgZW1pdHRlZCBhdCAwcywgMiBhdCAycywgMyBhdCA0cywgLi4uCgpjb25zdCBzbG93ZWREb3duTnVtYmVyU3RyZWFtID0gUnguT2JzZXJ2YWJsZS56aXAoCiAgbnVtYmVyU3RyZWFtLAogIFJ4Lk9ic2VydmFibGUuemlwKAogICAgbGV0dGVyU3RyZWFtLAogICAgUnguT2JzZXJ2YWJsZS5pbnRlcnZhbCgxMDAwMCkuc3RhcnRXaXRoKC0xKS5tYXAoeCA9PiB4ICsxKQogICkudGltZW91dFdpdGgoMTEwMDAsIFJ4Lk9ic2VydmFibGUuaW50ZXJ2YWwoMjAwMCkuc3RhcnRXaXRoKC0xKSksCiAgKG51bWJlciwgaW5kZXgpID0+ICh7bnVtYmVyLCBpbmRleH0pCikKClJ4Lk9ic2VydmFibGUubWVyZ2UobGV0dGVyU3RyZWFtLCBzbG93ZWREb3duTnVtYmVyU3RyZWFtKS5zdWJzY3JpYmUoeCA9PiBjb25zb2xlLmxvZyhuZXcgRGF0ZSgpLCAiID4gIiwgeCkpCmxldHRlclN0cmVhbS5jb25uZWN0KCkKbnVtYmVyU3RyZWFtLmNvbm5lY3QoKQ==

    You see, I use the letterStream in the slowedDownNumberStream.

    My situation now, I have one single stream as input (where letters and numbers are mixed), so I need to partition them, using groupBy..., and since I need both inside the mergeMap, so I put bufferCount between the groupBy and mergeMap.