Search code examples
google-cloud-functionsgoogle-cloud-dataflowpython-3.7apache-beamdill

Apache Beam pipeline with DataFlowRunner runs into _dill.py: "ModuleNotFoundError: No module named 'main'" when deployed from cloud function


I am trying to execute a dataflow pipeline from cloud functions on GCP using the Python sdk. Tested the code on a notebook server, where the pipeline works with DataFlowRunner. However, when using Cloud Functions to invoke the pipeline, I get the following:

Error


Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
 line 346, in run_http_function result = _function_handler.invoke_user_function(flask.request) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
 line 217, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py", 
 line 210, in call_user_function return self._user_function(request_or_event) File "/user_code/main.py", 
 line 215, in run_main BUCKET=BUCKET) File "/user_code/main.py", 
 line 143, in dataflow create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED File "/env/local/lib/python3.7/site-packages/apache_beam/pipeline.py", 
 line 481, in __exit__ self.run().wait_until_finish() File "/env/local/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", 
 line 1449, in wait_until_finish (self.state, getattr(self._runner, 'last_error_msg', None)), self) apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", 
 line 286, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", 
 line 648, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", 
 line 176, in execute op.start() File "apache_beam/runners/worker/operations.py", 
 line 649, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
 line 651, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
 line 652, in apache_beam.runners.worker.operations.DoOperation.start File "apache_beam/runners/worker/operations.py", 
 line 261, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", 
 line 266, in apache_beam.runners.worker.operations.Operation.start File "apache_beam/runners/worker/operations.py", 
 line 597, in apache_beam.runners.worker.operations.DoOperation.setup File "apache_beam/runners/worker/operations.py", 
 line 602, in apache_beam.runners.worker.operations.DoOperation.setup File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", 
 line 290, in loads return dill.loads(s) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 275, in loads return load(file, ignore, **kwds) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 270, in load return Unpickler(file, ignore=ignore, **kwds).load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", 
 line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'main'

So it seems to me that this issue only occurs when dataflow is invoked serverless. I tried adding a setup file as suggested here to make the pipeline install the dependencies in correct versions but this didn't fix it. Seems to me that this question is similar to this one, however I doubt the only (unaccepted) answer there will work as cloud function code always runs from main.py.

The pipeline code

class getResponse(beam.DoFn):
    def process(self, element, urlfield, pfield):
        response = requests.get(element[urlfield])
        status_code = response.status_code
        if status_code >= 200 and status_code < 300:
            yield {'id': element[pfield], 'response': response, 'url': element[urlfield]}

class getImageData(beam.DoFn):
    def process(self, element, responsefield, urlfield, pfield):
        p = element[responsefield]
        img = Image.open(BytesIO(p.content)).resize((10, 10), Image.ANTIALIAS).convert("L")
        yield {'id': element[pfield], 'url': element[urlfield], 'image_data': list(img.getdata())}

class outputDummies(beam.DoFn):
    def process(self, element, dummy_data, image_datafield, urlfield, pfield):
        if element[image_datafield] == dummy_data:
            yield {'id': element[pfield], 'url': element[urlfield]}

def dataflow(in_test_mode=True,
             query=None,
             table_schema=None,
             table_spec=None,
             dummy_data=None,
             job_name=None,
             PROJECT=None,
             REGION=None,
             BUCKET=None):
    if in_test_mode:
        RUNNER = "DirectRunner"
        OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)
    else:
        RUNNER = "DataflowRunner"
        OUTPUT_DIR = "gs://{0}/dummy_images/".format(BUCKET)

    options = {
        "job_name": job_name,
        "project": PROJECT,
        "region": REGION,
        "staging_location": os.path.join(OUTPUT_DIR, "tmp", "staging"),
        "temp_location": os.path.join(OUTPUT_DIR, "tmp"),
        "streaming": False
    }
    opts = beam.pipeline.PipelineOptions(**options)
    # Run Beam
    with beam.Pipeline(RUNNER,
                       options=opts,
                       argv=['--setup_file', '/tmp/setup.py']) as p:
        (p |
         "Read data" >> beam.io.Read(beam.io.BigQuerySource(query=query,
                                                               use_standard_sql=True)) |
         "Get responses" >> beam.ParDo(getResponse(),
                                       urlfield='url',
                                       pfield='id') |

         "Process images" >> beam.ParDo(getImageData(),
                                        responsefield='response',
                                        urlfield='url',
                                        pfield='id') |

         "Output dummy images" >> beam.ParDo(outputDummies(),
                                             dummy_data=dummy_data,
                                             image_datafield='image_data',
                                             urlfield='url',
                                             pfield='id') |

         "Write to BQ" >> beam.io.WriteToBigQuery(
             table_spec,
             schema=table_schema,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
         )

requirements.txt

apache_beam[gcp]==2.19.0
pillow==6.2.1
requests==2.23.0

Anybody got a workaround?


Solution

  • Thanks for all your suggestions, in the end I did not find solution to the error but I did come to a solution for the workflow. As AMargheriti pointed out, there's always dataflow templates. By creating a custom template of the code I was able to trigger the flow using a cloud function. Useful documentation is the create dataflow template, the running templates page and lastly this solution because the suggested API on the running templates page does not allow setting a region where to run the template whereas dataflow().projects().locations().templates().launch() does allow this option to be added.