Search code examples
pythonreactivexrx-py

in ReactiveX, how do I pass other parameters to Observer.create?


Using RxPY for illustration purposes.

I want to create an observable from a function, but that function must take parameters. This particular example must return, at random intervals, one of many pre-defined tickers which I want to send to it. My solution thus far is to use a closure:

from __future__ import print_function

from rx import Observable
import random
import string
import time

def make_tickers(n = 300, s = 123):
    """ generates up to n unique 3-letter strings geach makde up of uppsercase letters"""
    random.seed(s)
    tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3)) for y in range(n)]
    tickers = list(set(tickers)) # unique
    print(len(tickers))
    return(tickers)

def spawn_prices_fn(tickers):
    """ returns a function that will return a random element 
    out of tickers every 20-100 ms, and takes an observable parameter """

    def spawner(observer):
        while True:
            next_tick = random.choice(tickers)
            observer.on_next(next_tick)
            time.sleep(random.randint(20, 100)/1000.0)

    return(spawner)


if __name__ == "__main__":
    spawned = spawn_prices_fn(make_tickers())
    xx = Observable.create(spawned)
    xx.subscribe(lambda s: print(s))

Is there a simpler way? Can further parameters be sent to Observable.create's first paramater function, that don't require a closure? What is the canonical advice?


Solution

  • It can be done in numerous ways, here's one of the solutions that doesn't change your code too much. Note that tickers generation could also be broken up into a function generating a single string combined with some rx magic to be more rx-like

    I also slightly adjusted the code to make flake8 happy

    from __future__ import print_function
    
    import random
    import string
    import time
    
    from rx import Observable
    
    
    def make_tickers(n=300, s=123):
        """
        Generates up to n unique 3-letter strings each made up of uppercase letters
        """
        random.seed(s)
        tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3))
                   for y in range(n)]
        tickers = list(set(tickers))  # unique
        print(len(tickers))
        return(tickers)
    
    
    def random_picker(tickers):
        ticker = random.choice(tickers)
        time.sleep(random.randint(20, 100) / 1000.0)
        return ticker
    
    
    if __name__ == "__main__":
        xx = Observable\
            .repeat(make_tickers())\
            .map(random_picker)\
            .subscribe(lambda s: print(s))
    

    or a solution without make_tickers:

    from __future__ import print_function
    
    import random
    import string
    import time
    
    from rx import Observable
    
    
    def random_picker(tickers):
        ticker = random.choice(tickers)
        time.sleep(random.randint(20, 100) / 1000.0)
        return ticker
    
    
    if __name__ == "__main__":
        random.seed(123)
        Observable.range(1, 300)\
            .map(lambda _: ''.join(random.choice(string.ascii_uppercase)
                                   for _ in range(3)))\
            .reduce(lambda x, y: x + [y], [])\
            .do_while(lambda _: True)\
            .map(random_picker)\
            .subscribe(lambda s: print(s))
    

    time.sleep could be moved away from random_picker but the code would become a bit trickier