Search code examples
pythonprefectrclone

prefect-shell: Task run encountered an exception: RuntimeError: PID xxx failed with return code 6


When I run Rclone to list the difference between a local folder and a S3 folder in shell directly, it succeed.

➜ rclone copy --dry-run --include="*.tdms" --use-json-log /Users/hongbo-miao/Documents/motor hm-s3:my-bucket/motor
{"level":"warning","msg":"Skipped copy as --dry-run is set (size 30.518Mi)","object":"motor-1.tdms","objectType":"*local.Object","size":32000448,"skipped":"copy","source":"operations/operations.go:2360","time":"2023-06-05T17:34:51.1769-07:00"}
{"level":"warning","msg":"\nTransferred:   \t   30.518 MiB / 30.518 MiB, 100%, 0 B/s, ETA -\nTransferred:            1 / 1, 100%\nElapsed time:         0.7s\n\n","source":"accounting/stats.go:498","stats":{"bytes":32000448,"checks":0,"deletedDirs":0,"deletes":0,"elapsedTime":0.795462417,"errors":0,"eta":null,"fatalError":false,"renames":0,"retryError":false,"speed":0,"totalBytes":32000448,"totalChecks":0,"totalTransfers":1,"transferTime":0,"transfers":1},"time":"2023-06-05T17:34:51.178495-07:00"}

However, if I use Prefect ShellOperation from prefect-shell v0.1.5 to run same command:

@task
async def get_missing_files() -> None:
    log = await ShellOperation(
        commands=['rclone copy --dry-run --include="*.tdms" --use-json-log /Users/hongbo-miao/Documents/motor hm-s3:my-bucket/motor'],
        stream_output=False,
    ).run()


@flow
async def ingest_data() -> None:
    missing_files_list = await get_missing_files(
        source_dirname, s3_raw_path, delta_table_path, location
    )

I got error

➜ python src/main.py
17:38:33.356 | INFO    | prefect.engine - Created flow run 'chirpy-bull' for flow 'ingest-data'
17:38:33.869 | INFO    | Flow run 'chirpy-bull' - Created task run 'get_missing_files-0' for task 'get_missing_files'
17:38:33.870 | INFO    | Flow run 'chirpy-bull' - Executing 'get_missing_files-0' immediately...
17:38:34.102 | INFO    | Task run 'get_missing_files-0' - PID 40879 triggered with 1 commands running inside the '.' directory.
17:38:34.532 | ERROR   | Task run 'get_missing_files-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1550, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "ingest-data/src/tasks/get_missing_files.py", line 27, in get_missing_files
    log = await ShellOperation(
          ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/utilities/processutils.py", line 221, in open_process
    yield process
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 396, in run
    await shell_process.wait_for_completion()
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 177, in wait_for_completion
    raise RuntimeError(
RuntimeError: PID 40879 failed with return code 6.
17:38:34.621 | ERROR   | Task run 'get_missing_files-0' - Finished in state Failed('Task run encountered an exception: RuntimeError: PID 40879 failed with return code 6.\n')
17:38:34.621 | ERROR   | Flow run 'chirpy-bull' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "ingest-data/src/main.py", line 18, in ingest_data
    missing_files_list = await get_missing_files(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1550, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "ingest-data/src/tasks/get_missing_files.py", line 27, in get_missing_files
    log = await ShellOperation(
          ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/utilities/processutils.py", line 221, in open_process
    yield process
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 396, in run
    await shell_process.wait_for_completion()
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 177, in wait_for_completion
    raise RuntimeError(
RuntimeError: PID 40879 failed with return code 6.
17:38:34.732 | ERROR   | Flow run 'chirpy-bull' - Finished in state Failed('Flow run encountered an exception. RuntimeError: PID 40879 failed with return code 6.\n')
Traceback (most recent call last):
  File "ingest-data/src/main.py", line 41, in <module>
    asyncio.run(ingest_data())
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "ingest-data/src/main.py", line 18, in ingest_data
    missing_files_list = await get_missing_files(
                         ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1550, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "ingest-data/src/tasks/get_missing_files.py", line 27, in get_missing_files
    log = await ShellOperation(
          ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/utilities/processutils.py", line 221, in open_process
    yield process
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 396, in run
    await shell_process.wait_for_completion()
  File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 177, in wait_for_completion
    raise RuntimeError(
RuntimeError: PID 40879 failed with return code 6.
make: *** [poetry-run-dev] Error 1

What does this error mean? Thanks!


Solution

  • In my case, the Prefect agent tries to access the folder /Users/hongbo-miao/Documents/motor through Rclone. Originally, I thought that if Rclone can access the folder, then the same Rclone command in the Prefect task can run successfully.

    However, it seems that the Prefect agent also needs to have access to the local folder /Users/hongbo-miao/Documents/motor. In my case, it does not seem to have the necessary access.

    So I need make sure Prefect agent also has access of the folder.

    As I know Prefect agent has access of /tmp. If I simply move the folder to /tmp/motor and change the corresponding path in the code to:

    log = await ShellOperation(
        commands=['rclone copy --dry-run --include="*.tdms" --use-json-log /tmp/motor hm-s3:my-bucket/motor'],
        stream_output=False,
    ).run()
    

    Then it works well.