Search code examples
pythongoogle-cloud-dataflowapache-beamdataflow

Pass a dynamic write disposition in apache beam dataflow


I have a process that I am trying to automate. I already do dynamic table destination in WriteToBigQuery class but I want to do the same for the write_disposition parameter.

Is it possible to do this ?

My (not working) code :

    #Parser + Coder
parser = argparse.ArgumentParser()
coder = CustomCoder()

#Parsing args
_, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions()
dynamics_args = pipeline_options.view_as(DynamicArgs)

#Pipeline init
p1 = beam.Pipeline(argv=pipeline_args, options = pipeline_options)

#Processing
lines = p1 | "Reading file" >> beam.io.ReadFromText(dynamics_args.dataLocation, skip_header_lines=1, strip_trailing_newlines=True, coder = coder) | \
            "Cleaning file" >> beam.ParDo(CleanFile()) | \
            "Parsing file" >> beam.ParDo(ParseCSV(dynamics_args.sep)) | \
            "Mapping schema" >> beam.ParDo(MappingSchema(dynamics_args.destinationSchema)) | \
            "Cleaning fields" >> beam.ParDo(CleanFields()) | \
            "Converting numeric" >> beam.ParDo(ConvertNumericToBqFormat(dynamics_args.destinationSchema)) | \
            "Converting dates" >> beam.ParDo(ConvertDatesToBqFormat(dynamics_args.destinationSchema)) | \
            "Adding partition field" >> beam.ParDo(AddPartitionField()) | \
            "Fill null values" >> beam.ParDo(FillNullValues())

writeDisposition = p1 | "Get write disposition" >> beam.Create([dynamics_args.writeDisposition])
writeDisposition = beam.pvalue.AsSingleton(writeDisposition)


lines | "Writing data" >> beam.io.WriteToBigQuery(
    dynamics_args.destinationTable, schema= lambda x : castSchema(dynamics_args.destinationSchema.get()),
    create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition = dynamics_args.writeDisposition,
    additional_bq_parameters = {'timePartitioning': {'type': 'DAY', 'field': 'periode'}})

p1.run()

Solution

  • Your code looks correct. Whatever write disposition you specify on the command line when you invoke this code should be the write disposition used by the pipeline.