Search code examples
pythonreactivexrx-py

How can I extract values from an Rx Observable?


I am attempting to integrate some ReactiveX concepts into an existing project, thinking it might be good practice and a way to make certain tasks cleaner.

I open a file, create an Observable from its lines, then do some filtering until I get just the lines I want. Now, I want to extract some information from two of those lines using re.search() to return particular groups. I can't for the life of me figure out how to get such values out of an Observable (without assigning them to globals).

train = 'ChooChoo'

with open(some_file) as fd:
    line_stream = Observable.from_(fd.readlines())

a_stream = line_stream.skip_while(
        # Begin at dictionary
        lambda x: 'config = {' not in x
    ).skip_while(
        # Begin at train key
        lambda x: "'" + train.lower() + "'" not in x
    ).take_while(
        # End at closing brace of dict value
        lambda x: '}' not in x
    ).filter(
        # Filter sdk and clang lines only
        lambda x: "'sdk'" in x or "'clang'" in x
    ).subscribe(lambda x: match_some_regex(x))

In place of .subscribe() at the end of that stream, I have tried using .to_list() to get a list over which I can iterate "the normal way," but it only returns a value of type:

<class 'rx.anonymousobservable.AnonymousObservable'>

What am I doing wrong here?

Every Rx example I have ever seen does nothing but print results. What if I want them in a data structure I can use synchronously?


Solution

  • For the short term, I implemented the feature I wanted using itertools (as suggested by @jonrsharpe). Still the problem grated at back of my mind, so I came back to it today and figured it out.

    This is not a good example of Rx, since it only uses a single thread, but at least now I know how to break out of "the monad" when need be.

    #!/usr/bin/env python
    
    from __future__ import print_function
    from rx import *
    
    def my_on_next(item):
        print(item, end="", flush=True)
    
    def my_on_error(throwable):
        print(throwable)
    
    def my_on_completed():
        print('Done')
        pass
    
    def main():
        foo = []
    
        # Create an observable from a list of numbers
        a = Observable.from_([14, 9, 5, 2, 10, 13, 4])
    
        # Keep only the even numbers
        b = a.filter(lambda x: x % 2 == 0)
    
        # For every item, call a function that appends the item to a local list
        c = b.map(lambda x: foo.append(x))
        c.subscribe(lambda x: x, my_on_error, my_on_completed)
    
        # Use the list outside the monad!
        print(foo)
    
    if __name__ == "__main__":
        main()
    

    This example is rather contrived, and all the intermediate observables aren't necessary, but it demonstrates that you can easily do what I originally described.