I am attempting to integrate some ReactiveX concepts into an existing project, thinking it might be good practice and a way to make certain tasks cleaner.
I open a file, create an Observable from its lines, then do some filtering until I get just the lines I want. Now, I want to extract some information from two of those lines using re.search() to return particular groups. I can't for the life of me figure out how to get such values out of an Observable (without assigning them to globals).
train = 'ChooChoo'
with open(some_file) as fd:
line_stream = Observable.from_(fd.readlines())
a_stream = line_stream.skip_while(
# Begin at dictionary
lambda x: 'config = {' not in x
).skip_while(
# Begin at train key
lambda x: "'" + train.lower() + "'" not in x
).take_while(
# End at closing brace of dict value
lambda x: '}' not in x
).filter(
# Filter sdk and clang lines only
lambda x: "'sdk'" in x or "'clang'" in x
).subscribe(lambda x: match_some_regex(x))
In place of .subscribe()
at the end of that stream, I have tried using .to_list()
to get a list over which I can iterate "the normal way," but it only returns a value of type:
<class 'rx.anonymousobservable.AnonymousObservable'>
What am I doing wrong here?
Every Rx example I have ever seen does nothing but print results. What if I want them in a data structure I can use synchronously?
For the short term, I implemented the feature I wanted using itertools (as suggested by @jonrsharpe). Still the problem grated at back of my mind, so I came back to it today and figured it out.
This is not a good example of Rx, since it only uses a single thread, but at least now I know how to break out of "the monad" when need be.
#!/usr/bin/env python
from __future__ import print_function
from rx import *
def my_on_next(item):
print(item, end="", flush=True)
def my_on_error(throwable):
print(throwable)
def my_on_completed():
print('Done')
pass
def main():
foo = []
# Create an observable from a list of numbers
a = Observable.from_([14, 9, 5, 2, 10, 13, 4])
# Keep only the even numbers
b = a.filter(lambda x: x % 2 == 0)
# For every item, call a function that appends the item to a local list
c = b.map(lambda x: foo.append(x))
c.subscribe(lambda x: x, my_on_error, my_on_completed)
# Use the list outside the monad!
print(foo)
if __name__ == "__main__":
main()
This example is rather contrived, and all the intermediate observables aren't necessary, but it demonstrates that you can easily do what I originally described.