I have an apache-beam based dataflow job to read using vcf source from a single text file (stored in google cloud storage), transform text lines into datastore Entities
and write them into the datastore sink. The workflow works fine but the cons I noticed is that:
--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100
but the execution seems to prefer one worker (see graph below: the target workers once increased to 2 but reduced to 1 "based on the ability to parallelize the work in the currently running step").I did not use ancestor path for the keys; all the entities are the same kind
.
The pipeline code looks like below:
def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
| 'Create my entity' >> beam.ParDo(
ToEntityFn(), user_options.kind)
| 'Write to datastore' >> WriteToDatastore(project))
Because I have millions of rows to write into the datastore, it would take too long to write with a speed of 30 entities/sec.
Question: The input is just one huge gzipped file. Do I need to split it into multiple small files to trigger multiple workers? Is there any other way I can make the importing faster? Do I miss something in the num_workers
setup? Thanks!
I looked into the design of vcfio. I suspect (if I understand correctly) that the reason I always get one worker when the input is a single file is due to the limit of the _VcfSource
and the VCF format constraint. This format has a header part that defines how to translate the non-header lines. This causes that each worker that reads the source file has to work on an entire file. When I split the single file into 5 separate files that share the same header, I successfully get up to 5 workers (but not any more probably due to the same reason).
One thing I don't understand is that the number of workers that read can be limited to 5 (in this case). But why we are limited to have only 5 workers to write? Anyway, I think I have found the alternative way to trigger multiple workers with beam Dataflow-Runner (use pre-split VCF files). There is also a related approach in gcp variant transforms project, in which the vcfio
has been significantly extended. It seems to support the multiple workers with a single input vcf file. I wish the changes in that project could be merged into the beam project too.