Search code examples
pythonreactivexrx-py

Batching Results from ReactiveX Observable


Say I have an observable that looks like this (this is Python, but should be general to all languages):

rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

I want to ultimately be able to batch the Integers into lists of length 5 and be able to pass those to a function, so something like this:

batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])

In reality, the incoming data will be an inifinite stream of (potentially empty) lists. I just want to make sure that my subsequent calls to batch_function are not made until I've accumulated 5 actual values. Thanks for your help.


Solution

  • The following snippet is working for me using buffer_with_count. I'm not sure if there's a more parsimonious way to do it, though, i.e., without the flat_map.

    BUFFER_COUNT=5
    rx.Observable.from_iterable(iter(get_items())) \
      .flat_map(lambda it: it) \
      .buffer_with_count(BUFFER_COUNT) \
      .map(lambda my_array: do_something_with(my_array)) \
      .subscribe(lambda it: print(it))