Search code examples
pythonreactive-programmingrx-py

Why does an RxPY Observable act as an infinite iterable?


I had a bug which accidentally used an Observable as an iterable. For most objects, this is usually easily detected:

>>> tuple(object())

Traceback (most recent call last):
  File "C:\Program Files (x86)\Python27\lib\site-packages\IPython\core\interactiveshell.py", line 3035, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-4-40e3dfc60da8>", line 1, in <module>
    tuple(object())
TypeError: 'object' object is not iterable

However, for an Rx observable, it silently crashes Python:


MWE:

from rx import Observable

observable = Observable.from_list([1,2,3])
tuple(observable)  # Python will die silently here

There is no traceback and no indication there is any problem. This makes already-hard-to-debug concurrent reactive code even harder to debug -- took me 2 hours to finally track this one down.


On closer inspection, iterating over an Observable appears to create new observables, although from where I have no idea given the observable has no __iter__ method.

>>> for i, x in enumerate(observable):
>>>     print x
>>>     if i > 100:  # To prevent Python from crashing
>>>         break

<rx.anonymousobservable.AnonymousObservable object at 0x03111710>
<rx.anonymousobservable.AnonymousObservable object at 0x03111850>
<rx.anonymousobservable.AnonymousObservable object at 0x03111990>
<rx.anonymousobservable.AnonymousObservable object at 0x03111AD0>
<rx.anonymousobservable.AnonymousObservable object at 0x03111C10>
<rx.anonymousobservable.AnonymousObservable object at 0x03111D50>
<rx.anonymousobservable.AnonymousObservable object at 0x03111E90>
etc...

Is this a bug, or a feature? Are Observables intended to be iterable?


Solution

  • I ran into the same problem, in fact I crashed my computer by accidentally turning an observable into a list, which was a little funny.

    I'm no expert, I've been thrown in on the deep side a week ago, but this appears to be a feature. Observables are non-blocking by default, so if there is nothing there it simply returns an empty observable. It's like a file opened with the O_NONBLOCK flag: A read call returns immediately with an empty string.

    If you want to use it as a blocking iterator, use to_blocking(). You can then do something like:

    from rx import Observable
    o = Observable.from_([1, 2, 3, 4])
    i = iter(o.to_blocking())
    list(i)
    

    And it will work perfectly.

    Edit: I just discovered why this happens. There is a clash between the way python's iterator protocol, and one of the methods on an Observable, the next() method. The documentation says:

    next() — returns an iterable that blocks until the Observable emits another item, then returns that item.

    In python land, next() is expected to return the next item on the iterable. Hence you get an infinite supply of observables that each promises to return an item at some future date.