I use RxPy for using processing files and I want to build sequence of pipe loading
pool_scheduler = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.from_list(independing_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler),
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("independing frames loaded!"))
withdraw_file = []
for file in filtered_files:
if self._table_name_on_contain(file) == 'mellow':
withdraw_file += [file]
rx.from_list(withdraw_file).pipe(
self._build_apples_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: logger.info("apples loaded!"))
rx.from_list(depending_files).pipe(
self._build_dataflow(),
ops.subscribe_on(pool_scheduler)
).subscribe(
on_next=lambda file: logger.info(f'file: {file}'),
on_error=print,
on_completed=lambda: self._complete_action())
But I got a result that I have not expected: it seems that each pipe runs asynchronously because I have not denoted "stop-points". I want that second and third pipe will start only after first pipe done. How to fix it?
You can use multiprocessing.Event
for synchronizing your pipes:
event = multiprocessing.Event()
rx.pipe(...).subscribe(on_completed=event.set)
event.wait()
rx.pipe(...)
rx.pipe(...)