I'm having difficulties to see why this code is dropping the first element?
const Rx = require("@reactivex/rxjs")
const requestQueue = Rx.Observable.from([
{type: "show.json", value: "1"},
{type: "tweets.json", value: "A"},
{type: "show.json", value: "2"},
{type: "tweets.json", value: "B"}
])
requestQueue.groupBy(request => request.type).bufferCount(2).mergeMap(requestSubstreams => {
return requestSubstreams[0].merge(requestSubstreams[1])
}).subscribe(x => console.log(x))
The output I'm getting is:
{ type: 'tweets.json', value: 'A' }
{ type: 'show.json', value: '2' }
{ type: 'tweets.json', value: 'B' }
Been doing some experiments, it's always the first element that gets dropped.
This is the link to RxFiddle: http://rxfiddle.net/#type=editor&code=Y29uc3QgcmVxdWVzdFF1ZXVlID0gUnguT2JzZXJ2YWJsZS5mcm9tKFsKICB7dHlwZTogInNob3cuanNvbiIsIHZhbHVlOiAiMSJ9LAogIHt0eXBlOiAidHdlZXRzLmpzb24iLCB2YWx1ZTogIkEifSwKICB7dHlwZTogInNob3cuanNvbiIsIHZhbHVlOiAiMiJ9LAogIHt0eXBlOiAidHdlZXRzLmpzb24iLCB2YWx1ZTogIkIifQpdKQoKcmVxdWVzdFF1ZXVlLmdyb3VwQnkocmVxdWVzdCA9PiByZXF1ZXN0LnR5cGUpLmJ1ZmZlckNvdW50KDIpLm1lcmdlTWFwKHJlcXVlc3RTdWJzdHJlYW1zID0+IHsKICByZXR1cm4gcmVxdWVzdFN1YnN0cmVhbXNbMF0ubWVyZ2UocmVxdWVzdFN1YnN0cmVhbXNbMV0pCn0pLnN1YnNjcmliZSh4ID0+IGNvbnNvbGUubG9nKHgpKQ== ... but I haven't figured out how to interpret it.
I have to resort to this code (using partition) in order to get the result I was expecting:
const requestQueue = Rx.Observable.from([
{type: "show.json", value: "1"},
{type: "tweets.json", value: "A"},
{type: "show.json", value: "2"},
{type: "tweets.json", value: "B"}
])
const requestSubstreams = requestQueue.partition(request => request.type == "show.json")
const showJsonSubstream = requestSubstreams[0]
const tweetsJsonSubstream = requestSubstreams[1]
showJsonSubstream.merge(tweetsJsonSubstream).subscribe(x => console.log(x))
Can you help point out what's the issue / caveat with groupBy?
To @cartant : it has to do with this code (in this code) the letters and numbers are from separate observables.
const letterArr = ["A", "B", "C", "D", "E", "F", "G", "H", "I"]
const letterStream = Rx.Observable.zip(
Rx.Observable.from(letterArr),
Rx.Observable.range(0, letterArr.length - 1),
(letter, index) => Rx.Observable.of({letter, index}).delay(
index == 0 ? 0 : index < 5 ? (index == 2 ? 2000 : 5000) : 20000000
)
).concatAll().multicast(new Rx.Subject())
//A emitted at 0s, B at 5s, C at 7s, D at 12s, E at 17s, F at ... never
const numberStream = Rx.Observable.zip(
Rx.Observable.range(1, 20),
Rx.Observable.interval(2000).startWith(-1).map(x => x +1),
(number, index) => ({number, index})
).multicast(new Rx.Subject())
//1 emitted at 0s, 2 at 2s, 3 at 4s, ...
const slowedDownNumberStream = Rx.Observable.zip(
numberStream,
Rx.Observable.zip(
letterStream,
Rx.Observable.interval(10000).startWith(-1).map(x => x +1)
).timeoutWith(11000, Rx.Observable.interval(2000).startWith(-1)),
(number, index) => ({number, index})
)
Rx.Observable.merge(letterStream, slowedDownNumberStream).subscribe(x => console.log(new Date(), " > ", x))
letterStream.connect()
numberStream.connect()
You see, I use the letterStream in the slowedDownNumberStream.
My situation now, I have one single stream as input (where letters and numbers are mixed), so I need to partition them, using groupBy..., and since I need both inside the mergeMap, so I put bufferCount between the groupBy and mergeMap.