Search code examples
google-cloud-dataflowapache-beam

Can't pass in Requirements.txt for Dataflow


I've been trying to deploy a pipeline on Google Cloud Dataflow. It's been a quite a challenge so far. I'm facing an import issue because I realised that ParDo functions require the requirements.txt to be present if not it will say that it can't find the required module. https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

So I tried fixing the problem by passing in the requirements.txt file, only to be met with a very incomprehensible error message.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

from google.cloud.bigtable.row import DirectRow
import datetime

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://tunnel-insight-2-0-dev-291100/dataflow'

# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

ib.options.recording_duration = '1m'

...
...

pipeline_result = DataflowRunner().run_pipeline(p, options=options)

I've tried to pass requirements using "options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt""

I get this error

---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
     90     try:
---> 91       out = subprocess.check_output(*args, **kwargs)
     92     except OSError:

/opt/conda/lib/python3.7/subprocess.py in check_output(timeout, *popenargs, **kwargs)
    410     return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
--> 411                **kwargs).stdout
    412 

/opt/conda/lib/python3.7/subprocess.py in run(input, capture_output, timeout, check, *popenargs, **kwargs)
    511             raise CalledProcessError(retcode, process.args,
--> 512                                      output=stdout, stderr=stderr)
    513     return CompletedProcess(process.args, retcode, stdout, stderr)

CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-12-f018e5c84d08> in <module>
----> 1 pipeline_result = DataflowRunner().run_pipeline(p, options=options)

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in run_pipeline(self, pipeline, options)
    491           environments.DockerEnvironment.from_container_image(
    492               apiclient.get_container_image_from_options(options),
--> 493               artifacts=environments.python_sdk_dependencies(options)))
    494 
    495     # This has to be performed before pipeline proto is constructed to make sure

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/transforms/environments.py in python_sdk_dependencies(options, tmp_dir)
    624           options,
    625           tmp_dir,
--> 626           skip_prestaged_dependencies=skip_prestaged_dependencies))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in create_job_resources(options, temp_dir, build_setup_args, populate_requirements_cache, skip_prestaged_dependencies)
    178             populate_requirements_cache if populate_requirements_cache else
    179             Stager._populate_requirements_cache)(
--> 180                 setup_options.requirements_file, requirements_cache_path)
    181         for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
    182           resources.append((pkg, os.path.basename(pkg)))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/retry.py in wrapper(*args, **kwargs)
    234       while True:
    235         try:
--> 236           return fun(*args, **kwargs)
    237         except Exception as exn:  # pylint: disable=broad-except
    238           if not retry_filter(exn):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/portability/stager.py in _populate_requirements_cache(requirements_file, cache_dir)
    569     ]
    570     _LOGGER.info('Executing command: %s', cmd_args)
--> 571     processes.check_output(cmd_args, stderr=processes.STDOUT)
    572 
    573   @staticmethod

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py in check_output(*args, **kwargs)
     97           "Full traceback: {} \n Pip install failed for package: {} \
     98           \n Output from execution of subprocess: {}" \
---> 99           .format(traceback.format_exc(), args[0][6], error.output))
    100       else:
    101         raise RuntimeError("Full trace: {}, \

RuntimeError: Full traceback: Traceback (most recent call last):
  File "/root/apache-beam-custom/packages/beam/sdks/python/apache_beam/utils/processes.py", line 91, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/opt/conda/lib/python3.7/subprocess.py", line 411, in check_output
    **kwargs).stdout
  File "/opt/conda/lib/python3.7/subprocess.py", line 512, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/root/apache-beam-custom/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1.
 
 Pip install failed for package: -r           
 Output from execution of subprocess: b'Obtaining file:///root/apache-beam-custom/packages/beam/sdks/python (from -r requirements.txt (line 3))\n  Saved /tmp/dataflow-requirements-cache/apache-beam-2.25.0.zip\nCollecting absl-py==0.11.0\n  Downloading absl-py-0.11.0.tar.gz (110 kB)\n  Saved /tmp/dataflow-requirements-cache/absl-py-0.11.0.tar.gz\nCollecting argon2-cffi==20.1.0\n  Downloading argon2-cffi-20.1.0.tar.gz (1.8 MB)\n  Installing build dependencies: started\n  Installing build dependencies: finished with status \'error\'\n  ERROR: Command errored out with exit status 1:\n   command: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\'\n       cwd: None\n  Complete output (85 lines):\n  Collecting setuptools>=40.6.0\n    Downloading setuptools-51.1.1.tar.gz (2.1 MB)\n  Collecting wheel\n    Downloading wheel-0.36.2.tar.gz (65 kB)\n  Collecting cffi>=1.0\n    Downloading cffi-1.14.4.tar.gz (471 kB)\n  Collecting pycparser\n    Downloading pycparser-2.20.tar.gz (161 kB)\n  Skipping wheel build for setuptools, due to binaries being disabled for it.\n  Skipping wheel build for wheel, due to binaries being disabled for it.\n  Skipping wheel build for cffi, due to binaries being disabled for it.\n  Skipping wheel build for pycparser, due to binaries being disabled for it.\n  Installing collected packages: setuptools, wheel, pycparser, cffi\n      Running setup.py install for setuptools: started\n      Running setup.py install for setuptools: finished with status \'done\'\n      Running setup.py install for wheel: started\n      Running setup.py install for wheel: finished with status \'done\'\n      Running setup.py install for pycparser: started\n      Running setup.py install for pycparser: finished with status \'done\'\n      Running setup.py install for cffi: started\n      Running setup.py install for cffi: finished with status \'error\'\n      ERROR: Command errored out with exit status 1:\n       command: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi\n           cwd: /tmp/pip-install-6zs5jguv/cffi/\n      Complete output (56 lines):\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      Package libffi was not found in the pkg-config search path.\n      Perhaps you should add the directory containing `libffi.pc\'\n      to the PKG_CONFIG_PATH environment variable\n      No package \'libffi\' found\n      running install\n      running build\n      running build_py\n      creating build\n      creating build/lib.linux-x86_64-3.7\n      creating build/lib.linux-x86_64-3.7/cffi\n      copying cffi/setuptools_ext.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/pkgconfig.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/verifier.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/vengine_gen.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/backend_ctypes.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/__init__.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/cffi_opcode.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/error.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/api.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/commontypes.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/ffiplatform.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/lock.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/cparser.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/recompiler.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/vengine_cpy.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/model.py -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/_cffi_include.h -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/parse_c_type.h -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/_embedding.h -> build/lib.linux-x86_64-3.7/cffi\n      copying cffi/_cffi_errors.h -> build/lib.linux-x86_64-3.7/cffi\n      running build_ext\n      building \'_cffi_backend\' extension\n      creating build/temp.linux-x86_64-3.7\n      creating build/temp.linux-x86_64-3.7/c\n      gcc -pthread -B /opt/conda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -DUSE__THREAD -DHAVE_SYNC_SYNCHRONIZE -I/usr/include/ffi -I/usr/include/libffi -I/root/apache-beam-custom/include -I/opt/conda/include/python3.7m -c c/_cffi_backend.c -o build/temp.linux-x86_64-3.7/c/_cffi_backend.o\n      c/_cffi_backend.c:15:10: fatal error: ffi.h: No such file or directory\n       #include <ffi.h>\n                ^~~~~~~\n      compilation terminated.\n      error: command \'gcc\' failed with exit status 1\n      ----------------------------------------\n  ERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python -u -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-install-6zs5jguv/cffi/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\\n\'"\'"\', \'"\'"\'\\n\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' install --record /tmp/pip-record-z8o69lka/install-record.txt --single-version-externally-managed --prefix /tmp/pip-build-env-3iuiaex9/overlay --compile --install-headers /root/apache-beam-custom/include/site/python3.7/cffi Check the logs for full command output.\n  WARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\n  You should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n  ----------------------------------------\nERROR: Command errored out with exit status 1: /root/apache-beam-custom/bin/python /root/apache-beam-custom/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-3iuiaex9/overlay --no-warn-script-location --no-binary :all: --only-binary :none: -i https://pypi.org/simple -- \'setuptools>=40.6.0\' wheel \'cffi>=1.0\' Check the logs for full command output.\nWARNING: You are using pip version 20.1.1; however, version 20.3.3 is available.\nYou should consider upgrading via the \'/root/apache-beam-custom/bin/python -m pip install --upgrade pip\' command.\n'

Did I do something wrong?

-------------- EDIT---------------------------------------

Ok, I've got my pipeline to work, but I'm still having a problem with my requirements.txt file which I believe I'm passing in correctly.

My pipeline code:

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from apache_beam.runners import DataflowRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

from google.cloud.bigtable.row import DirectRow
import datetime

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = ''

# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the requirements.txt file
options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt"

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

ib.options.recording_duration = '1m'

# The Google Cloud PubSub topic for this example.
topic = ""
subscription = ""
output_topic = ""

# Info
project_id = ""
bigtable_instance = ""
bigtable_table_id = ""

class CreateRowFn(beam.DoFn):
    def process(self,words):
        from google.cloud.bigtable.row import DirectRow
        import datetime
        direct_row = DirectRow(row_key="phone#4c410523#20190501")
        direct_row.set_cell(
            "stats_summary",
            b"os_build",
            b"android",
            datetime.datetime.now())
        return [direct_row]

p = beam.Pipeline(InteractiveRunner(),options=options)
words = p | "read" >> beam.io.ReadFromPubSub(subscription=subscription)
windowed_words = (words | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
# Writing to BigTable
test = words | beam.ParDo(CreateRowFn()) | WriteToBigTable(
    project_id=project_id,
            instance_id=bigtable_instance,
            table_id=bigtable_table_id)

pipeline_result = DataflowRunner().run_pipeline(p, options=options)

As you can see in "CreateRowFn", I need to import
from google.cloud.bigtable.row import DirectRow import datetime Only then this works.

I've passed in requirements.txt as options.view_as(pipeline_options.SetupOptions).requirements_file = "requirements.txt" and I see it on Dataflow console.

If I remove the import statements, I get "in process NameError: name 'DirectRow' is not defined".

Is there anyway to overcome this?


Solution

  • I've found the answer in the FAQs. My mistake was not about how to pass in requirements.txt but how to handle NameErrors

    https://cloud.google.com/dataflow/docs/resources/faq

    How do I handle NameErrors? If you're getting a NameError when you execute your pipeline using the Dataflow service but not when you execute locally (i.e. using the DirectRunner), your DoFns may be using values in the global namespace that are not available on the Dataflow worker.

    By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Dataflow worker.

    Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

    For example, instead of:

    import re
    …
    def myfunc():
      # use re module
    

    use:

    def myfunc():
      import re
      # use re module
    

    Alternatively, if your DoFns span multiple files, you should use a different approach to packaging your workflow and managing dependencies.

    So the conclusion is: It is ok to use import statements within the functions