I have a static pipeline with the following architecture:
main.py
setup.py
requirements.txt
module 1
__init__.py
functions.py
module 2
__init__.py
functions.py
dist
setup_tarball
The setup.py and requirements.txt contain the non-native PyPI and local functions which would be used by the Dataflow worker node. The dataflow options are written as follows:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from module2.functions import function_to_use
dataflow_options = ['--extra_package=./dist/setup_tarball','temp_location=<gcs_temp_location>', '--runner=DataflowRunner', '--region=us-central1', '--requirements_file=./requirements.txt]
So then the pipeline will run something like this:
options = PipelineOptions(dataflow_options)
p = beam.Pipeline(options=options)
transform = (p | ReadFromText(gcs_url) | beam.Map(function_to_use) | WriteToText(gcs_output_url))
Running this locally takes Dataflow around 6 minutes to complete, where most of the time goes to worker startup. I tried getting this code automated with Composer and re-arranged the architecture as follows: my main (dag) function in dags folder, the modules in plugins, and setup_tarball and requirements.txt in data folder... So the only parameters that really changed are:
'--extra_package=/home/airflow/gcs/data/setup_tarball'
'--requirements_file=/home/airflow/gcs/data/requirements.txt'
When I try running this modified code in Composer, it will work... but it'll take much, much longer... Once the worker starts up, it will take anywhere from 20-30 minutes before actually running the pipeline (which is only a few seconds).. This is much longer than triggering Dataflow from my local code, which was taking only 6 minutes to complete. I realize this question is very general, but since the code works, I don't think it's related to the Airflow task itself. Where would be a reasonable place to start looking at for troubleshooting this problem? At the Airflow level, what can be modified? How does Composer (Airflow) interact with Dataflow, and what can potentially cause this bottleneck?
It turns out that the problem was associated with Composer itself. The fix was to increase the capacity of Composer, i.e., increase vCPUs. Not sure why this would be the case, so if anyone has an idea for the foundation behind this issue, your input would be much appreciated!