I am testing a very simple Apache Beam pipeline using the Google Cloud Dataflow runner that reads audio elements from Pub/Sub, runs elements through a Tensorflow model, and writes the Result to Pub/Sub. During my test, I have disabled autoscaling and set the number of workers to 1. I haven't figured out how to get the Cloud Profiler working yet, so I added logging in several places to generate a "profile" of the various parts of my pipeline, including inside the main DoFn
the processes the audio, like so:
class ProcessAudio(beam.DoFn):
def setup(self):
... # do some setup stuff
def process(self, element):
chunk_num = element.attrs['chunk_num']
logging.info(f'chunk_{chunk_num} processing')
prediction = ... # do processing
logging.info(f'chunk_{chunk_num} processing_completed')
return [prediction]
# here's the actual pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
pipeline = (
pipeline
| 'ReadChunks' >>
beam.io.ReadFromPubSub(
subscription=read_subscription_name,
with_attributes=True,
)
| 'DecodeChunks' >> beam.Map(decode_chunk)
| 'RunModel' >> beam.ParDo(ProcessAudio())
| 'EncodeChunks' >> beam.Map(encode_chunk)
| beam.io.WriteToPubSub(write_topic_name, with_attributes=True)
)
With these logs, I have created my profile, and here's what I get when I have this pipeline process 30 elements:
The red sections represent the time each chunk (element) is processing in the main DoFn
. Unless I've done something screwy, this makes it look like all of the elements are being processed concurrently by the single worker I have provided, which should just be an n1-standard-2 machine which has only two threads. Why would Beam be running all of these elements in parallel this way on only two threads? I'd have expected it to process at most 2 elements concurrently, and I feel like the way it is processing things would cause a lot of context switching and be potentially an inefficient way to use the 2 vCPUs it has. Is there a way I can encourage it not to do this? Or is this actually efficient/ok to let it do for some reason? I'm just trying to understand how to make this as efficient/fast as possible.
Ok I figured this out.
There is a useful tidbit buried in the OOM troubleshooting docs for Dataflow. There is a table there that seems to indicate Dataflow assigns 1 process per vCPU, 12 threads per process, and 1 DoFn
per thread, meaning I should be seeing 24 DoFn
s per worker, which isn't far from the mark.
Also, per the docs for the --number_of_worker_harness_threads
option to Dataflow, by default "...the Dataflow service determines an appropriate number of threads per worker".
So one thing I did was set this option: --number_of_worker_harness_threads=2
. I'm not 100% sure what is meant by a worker harness and if one harness equals one worker, but setting this to 2 decreased that per-worker parallelism dramatically and actually increased the overall speed of my pipeline slightly, which is what I was hoping would happen.
When I re-processed my profile, I still noticed Dataflow processing 4 elements in parallel on one worker. The final piece of the puzzle was the --experiments=no_use_multiple_sdk_containers
option. It seems Dataflow will also run multiple instances of the sdk on one worker machine unless you tell it not to. When I set this option, I ended up with the following profile:
Thats more like what I was expecting. However, as the docs warn, I am seeing I have probably backed off too far and now the pipeline is using only about 70% of the worker CPU. Probably bottlenecked by python's GIL now, but at least I know what knobs to turn to fix it.