Say I have an observable that looks like this (this is Python, but should be general to all languages):
rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])
I want to ultimately be able to batch the Integers into lists of length 5 and be able to pass those to a function, so something like this:
batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])
In reality, the incoming data will be an inifinite stream of (potentially empty) lists. I just want to make sure that my subsequent calls to batch_function
are not made until I've accumulated 5 actual values. Thanks for your help.
The following snippet is working for me using buffer_with_count
. I'm not sure if there's a more parsimonious way to do it, though, i.e., without the flat_map
.
BUFFER_COUNT=5
rx.Observable.from_iterable(iter(get_items())) \
.flat_map(lambda it: it) \
.buffer_with_count(BUFFER_COUNT) \
.map(lambda my_array: do_something_with(my_array)) \
.subscribe(lambda it: print(it))