Search code examples
pythonmultithreadingpython-multithreadingreactivexrx-py

How to wait for RxPy parallel threads to complete


Based on this excellent SO answer I can get multiple tasks working in parallel in RxPy, my problem is how do you wait for them to all complete? I know using threading I can do .join() but there doesn't seem to be any such option with Rx Schedulers. .to_blocking() doesn't help either, the MainThread completes before all notifications have fired and the complete handler has been called. Here's an example:

from __future__ import print_function
import os, sys
import time
import random
from rx import Observable
from rx.core import Scheduler
from threading import current_thread

def printthread(val):
    print("{}, thread: {}".format(val, current_thread().name))

def intense_calculation(value):
    printthread("calc {}".format(value))
    time.sleep(random.randint(5, 20) * .1)
    return value

if __name__ == "__main__":
    Observable.range(1, 3) \
        .select_many(lambda i: Observable.start(lambda: intense_calculation(i), scheduler=Scheduler.timeout)) \
        .observe_on(Scheduler.event_loop) \
        .subscribe(
            on_next=lambda x: printthread("on_next: {}".format(x)),
            on_completed=lambda: printthread("on_completed"),
            on_error=lambda err: printthread("on_error: {}".format(err)))

    printthread("\nAll done")
    # time.sleep(2)

Expected output

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4
All done, thread: MainThread

Actual output

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread

Actual output if I uncomment the sleep call

calc 1, thread: Thread-1
calc 2, thread: Thread-2
calc 3, thread: Thread-3

All done, thread: MainThread
on_next: 2, thread: Thread-4
on_next: 3, thread: Thread-4
on_next: 1, thread: Thread-4
on_completed, thread: Thread-4

Solution

  • For ThreadPoolScheduler, you can:

    1. scheduler = ThreadPoolScheduler(pool_size)
    2. parallel calls.
    3. scheduler.executor.shutdown()

    then, you can get all results once all are done.