Search code examples
google-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-composer

Composer throws error when installing setup.py with BeamRunPythonPipelineOperator


I am having trouble when passing the argument "setup_file" to my BeamRunPythonPipelineOperator. Here is the traceback in Composer logs.

[2022-11-16, 05:03:19 UTC] {beam.py:127} WARNING - error: [Errno 2] No such file or directory: 'csv_converter-0.0.1/csv_converter.egg-info/PKG-INFO'
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - Traceback (most recent call last):
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/site-packages/apache_beam/utils/processes.py", line 89, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     out = subprocess.check_output(*args, **kwargs)
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/subprocess.py", line 415, in check_output
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -   File "/opt/python3.8/lib/python3.8/subprocess.py", line 516, in run
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING -     raise CalledProcessError(retcode, process.args,
[2022-11-16, 05:03:20 UTC] {beam.py:127} WARNING - subprocess.CalledProcessError: Command '['/usr/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpifl6ty8k']' returned non-zero exit status 1.

I have no clue why this [Errno 2] No such file or directory. Some DAGs run just fine, while some report this error. Sometimes I get different errors such as another file from setup.py can't be found or [Errno 5] Input/Output Error

This is my operator:

BeamRunPythonPipelineOperator(
    task_id='xxxx',
    runner="DataflowRunner",
    py_file=f'/home/airflow/gcs/data/csv_converter/main.py',
    pipeline_options={
        'project_id': project_id,
        'input_path': input_path,
        'output_path': output_path,
        'schema_path': schema_path,
        'service_account': service_account,     
        'no_use_public_ips': True,
        'subnetwork': subnetwork,      
        'staging_location': staging_location,
        'temp_location': temp_location,
        "setup_file": f'/home/airflow/gcs/data/csv_converter/setup.py',
        "machine_type": "n1-standard-4",
        "num_workers": 5,
        "max_num_workers": 10,
    },
    py_options=[],
    py_interpreter='python3',
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name='{{task.task_id}}',
        location=gce_region,
        wait_until_finished=False,
        gcp_conn_id="dataflow_conn"
    ),
)

This error is very frustrating as I have no clue how to fix it and haven't found anyone experiencing the same issue.

Some context: Our process consists of triggering DAGs when .CSVs land in a bucket. At first I thought it was a problem with the schedulers and concurrency since we had some zombie tasks. I noticed that with 2 schedulers with 2 vCPUs, we notice a CPU usage around ~80% (always stuck at >3/4 vCPUs, even though DAGs are triggered in a burst fashion when multiple .CSVs land). I tried increasing the schedulers to 4 and 4 vCPUs but the problem persists. I expect the process to install my package correctly.

  • Composer version: 2.0.31
  • Airflow version: 2.3.3
  • apache-airflow-providers-google version: 8.1.0
  • apache-beam version: 2.41.0

Solution

  • I had this issue previously, I think if you use the setup.py in the root from the Composer DAG folder, it will solve your issue.

    I also recommend you to deploy your Beam job folder to dags instead of data.

    data is more used for Airflow variables.

    Example :

    # Get DAG folder via an env var
    dag_folder = os.getenv("DAGS_FOLDER")
    
    BeamRunPythonPipelineOperator(
        task_id='xxxx',
        runner="DataflowRunner",
        py_file=f'/home/airflow/gcs/dags/csv_converter/main.py',
        pipeline_options={
            'project_id': project_id,
            'input_path': input_path,
            'output_path': output_path,
            'schema_path': schema_path,
            'service_account': service_account,     
            'no_use_public_ips': True,
            'subnetwork': subnetwork,      
            'staging_location': staging_location,
            'temp_location': temp_location,
            'setup_file': f"{dag_folder}/setup.py",
            "machine_type": "n1-standard-4",
            "num_workers": 5,
            "max_num_workers": 10,
        },
        py_options=[],
        py_interpreter='python3',
        py_system_site_packages=False,
        dataflow_config=DataflowConfiguration(
            job_name='{{task.task_id}}',
            location=gce_region,
            wait_until_finished=False,
            gcp_conn_id="dataflow_conn"
        ),
    )
    

    Some explanations :

    • The setup.py is used at the root of DAG folder : {composer_bucket}/dags/setup.py
    • The dag_folder is retrieve from a Composer env var
    • The setup is set as option in the Beam operator with the following way : 'setup_file': f"{dag_folder}/setup.py"