Search code examples
pythongoogle-cloud-dataflowapache-beam

NameError when running Apache Beam pipeline on Google Dataflow when using nested functions


I am working on an Apache Beam pipeline in Python, and I'm encountering a NameError when running the pipeline on Google Dataflow. The error specifically mentions that 'json_encoder' is not defined. The pipeline works fine when running it locally.

Here's the gist of the code:

import apache_beam as beam
import decimal

# Simplified Apache Beam pipeline step
input | "Convert to string" >> beam.Map(encode_as_task)

def encode_as_task(element, cache_enabled=False):
    import orjson
    # Convert the element to a string before publishing to Pub/Sub
    task = dict()
    tasks = []
    task['task'] = element[2][0]
    task['task'].pop('client_id', None)
    task['clientIds'] = element[1]
    tasks.append(task)
    task_generator_request = {"tasks": tasks, "cacheEnabled": cache_enabled}
    message = orjson.dumps(task_generator_request, default=json_encoder)
    return message

def json_encoder(value):
    if isinstance(value, decimal.Decimal):
        return float(value)
    raise TypeError

This code works well locally.

But when running in cloud dataflow I get the following error. NameError: name 'json_encoder' is not defined.

I used to get these issues when dealing with external dependencies such as orjson. I resolved those by importing the package inside the function directly like import orjson in encode_as_task.

But since json_encoder is an internal function, not sure how to handle or import the same.

Side note, I am sending the requirement file using beam runtime argument, --requirements_file=requirements.txt to tell apache beam and dataflow regarding the dependencies. Still facing these issues.


Solution

  • Can you pass --save_main_session to the pipeline options? Saving main session means it saves the funcs and variables in the main class so they can pickled and unpickled on the Dataflow.

    For more info on save_main_session, please take a look at https://beam.apache.org/releases/pydoc/2.51.0/_modules/apache_beam/options/pipeline_options.html