Search code examples
pythonreactive-programmingrx-py

Partitioned Observable second stream never reached


I have an Observable handling a web request, where I want to handle a success or failure in separate streams, very similar to this example. The main difference between my script and the example is that I do not want merge the streams and then subscribe. I'm using RxPY 1.6.1 with Python 2.7.

request = Observable.of(requests.get(self.URL, params=request_params))

request_success, request_failed = request.partition(lambda r: r.status_code == requests.codes.ok)          

request_failed.subscribe(lambda r: print_msg('failure!'))
request_success.subscribe(lambda r: print_msg('success!'))

When the request fails, the script prints failure! as expected. However, when the response is OK, the script does not print success!. Interestingly, when you switch around the order of the subscriptions, success! does get printed, while failure! is never reached instead.

I figured maybe request could not be multicasted, so I tried adding publish() to the request observable and calling connect() after creating the subscriptions. That did not help (so I left it out of my minimal example above).

What am I missing?


Solution

  • From comparing your code to the unit tests that RxPy has for the partition operator it looks like the code is almost correct.

    You were on the right track, you did need to turn the request Observable into a multicasted observable.

    Here is working code (tested on Repl.it, you will have to convert the list of requests back to the classes/objects you're using in your code):

    from rx import Observable
    
    def print_msg(message):
      print(message)
    
    class Request(object):
      def __init__(self, status_code):
        self.status_code = status_code
    
    request = Observable.of(
      Request(200),
      Request(404),
      Request(412),
      Request(200),
    ).publish()
    
    request_success, request_failed = request.partition(lambda r: \
      r.status_code == 200)
    
    request_success.subscribe(lambda r: print_msg('success!'))
    request_failed.subscribe(lambda r: print_msg('failure!'))
    request.connect()
    

    Notice that as soon as the list of requests is turned into an Observable, it's published (Observable.of(...).publish()) and only after we subscribe on the partitioned observables do we call connect.

    The output was:

    success!
    failure!
    failure!
    success!