I'm trying to run an Apache Beam pipeline on Google Dataflow. This pipeline reads data from Google BigQuery, adds a schema, converts it to a Dataframe, and performs a transformation on that dataframe using a third-party library (scrubadub
).
From the Google Code CLI on GCP, I run:
/usr/bin/python /home/test_user/dataflow_beam_test.py --requirements_file /home/test_user/requirements.txt
Following the instructions under "PyPi Dependencies" here, my requirements.txt file contains (among other packages):
scrubadub==2.0.0
I haven't been able to get the pipeline to install the third party Python library I'm working with (scrubadub
) onto remote workers. I've verified this package works locally.
Here is the relevant code:
with beam.Pipeline(argv=argv) as p:
pcoll = (p | 'read_bq_view' >>
beam.io.ReadFromBigQuery(query=BIGQUERY_SELECT_QUERY,use_standard_sql=True)
| 'ToRows' >> beam.Map(lambda x: beam.Row(id=x['id'], user_id=x['user_id'],query=x['query']))
)
df = beam_convert.to_dataframe(pcoll)
df['query'] = df['query'].apply(lambda x: scrubadub.clean(x))
The last line in this code block is what causes the error (I've confirmed by commenting it out and running the pipeline successfully).
I've tried importing scrubadub at the top level of the file and as part of my run()
function; both throw the same error:
/usr/local/lib/python3.9/site-packages/dill/_dill.py", line 826, in _import_module return __import__(import_name) ModuleNotFoundError: No module named 'scrubadub'
Notably, it doesn't seem like pip install -r requirements.txt
is ever running on the workers.
To solve your issue and use PyPi
packages with Beam, I recommend you using a setup.py
file, example :
from setuptools import find_packages, setup
setup(
name="dataflow_job",
version="0.0.1",
install_requires=[
'scrubadub==2.0.0'
],
packages=find_packages(),
)
For example put this file at the root of the project.
Then in the command line that launches the Beam
job with Dataflow
, use the setup_file
program argument :
python -m your_module.your_folder.main \
--runner=DataflowRunner \
--setup_file=./setup.py \
....
It's important to follow this to launch correctly your job :
2.45.0
) : apache_beam==2.45.0
and scrubadub==2.0.0
The versions of packages should be the same as the packages used by the workers. The runner will instantiate the job.setup.py
file (same versions as them used by the runner). In the setup.py
you don't have to install Beam, because it's given the runtime environment.