Search code examples
pythonrx-py

RxPy3 group_by creates "groupedobservables" but flat_map doesn't work - Reactive Python for Data Science Refactor


On deciding to learn RxPy I took the free course Reactive Python for Data Science from O'Reilly

I quickly realised that the code was written for Python 3.5 and RxPy2 so I forked the original repo and decided to learn by refactoring the code for RxPy3

The original code for version 2 was:

from rx import Observable

items = ["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]

Observable.from_(items) \
    .group_by(lambda s: len(s)) \
    .flat_map(lambda grp: grp.to_list()) \
    .subscribe(lambda i: print(i))

I've learned enough to import from_ and operators and to use `.pipe to string together the operators.

So far I have got to:

from rx import from_, operators as ops

items = ["Alpha", "Beta", "Gamma", "Delta", "Epsilon"]

from_(items).pipe(
    ops.group_by(lambda s: len(s)),
    ops.flat_map(lambda grp: grp.to_list())  # Todo grp.to_list() of a groupedobservable is not working - fix it
).subscribe(lambda i: print(i))

The problem is that ops.group_by provides a set of "groupedobservables" which ops.flat_map code grp.to_list() doesn't map into grouped lists.

The original code is here: Reactive Python for Data Science

My refactored code is forked here Reactive Python RxPy3 and the lesson is the code_examples file 6.4A_grouping_into_lists.py


Solution

  • Since to_list is an operator, it should be applied to group through nested pipe:

    from rx import of, operators as ops
    
    of("Alpha", "Beta", "Gamma", "Delta", "Epsilon").pipe(
        ops.group_by(lambda s: len(s)),
        ops.flat_map(lambda grp: grp.pipe(ops.to_list()))  
    ).subscribe(lambda i: print(i))
    

    Result:

    ['Alpha', 'Gamma', 'Delta']
    ['Beta']
    ['Epsilon']