Search code examples
pythonrx-py

How to synchronize rx pipes in Python?


I use RxPy for using processing files and I want to build sequence of pipe loading

pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())

    rx.from_list(independing_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler),
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("independing frames loaded!"))

    withdraw_file = []
    for file in filtered_files:
        if self._table_name_on_contain(file) == 'mellow':
            withdraw_file += [file]

    rx.from_list(withdraw_file).pipe(
        self._build_apples_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: logger.info("apples loaded!"))

    rx.from_list(depending_files).pipe(
        self._build_dataflow(),
        ops.subscribe_on(pool_scheduler)
    ).subscribe(
        on_next=lambda file: logger.info(f'file: {file}'),
        on_error=print,
        on_completed=lambda: self._complete_action())

But I got a result that I have not expected: it seems that each pipe runs asynchronously because I have not denoted "stop-points". I want that second and third pipe will start only after first pipe done. How to fix it?


Solution

  • You can use multiprocessing.Event for synchronizing your pipes:

    event = multiprocessing.Event()
    
    rx.pipe(...).subscribe(on_completed=event.set)
    
    event.wait()
    
    rx.pipe(...)
    rx.pipe(...)