I'm trying to create a dataflow (batch) that read a file per hour from Google Cloud Storage, parse it and write an entry in a BigQuery table. The file is a .json in witch each line there is a complex json.
I've created a simple pipeline:
(p
| "Read file" >> beam.io.ReadFromText(cusom_options.file_name)
| "Parse line json" >> beam.Map(parse)
| "Write in BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
table=cusom_options.table))
Parse function is like this:
def parse(input_elem):
log = json.loads(input_elem)
result = {}
... # Get some fields from input and add to "result"
return result
The pipeline work fine with file size of 100 Mb and 70K lines (5 min aprox per job). But when the file increase, the dataflow take more time (15 min, 200-300 Mb) or not finish and end with fail (more than a 1.5 Gb and 350K lines).
I've made some test, when I created a json sample in function parse but not use input_elem the dataflow works fine and create a row for each entry in 7-8 min.
I don't know where is the problem with the pipeline, anyone have a similar issue?
We have finally solved the problem. Parallel to the work with dataflow, some VPC networks have been created in the application, and the firewall rules were not correctly configured.
This case is similar to the one described in the documentation (The VPC network used for your job might be missing). The rules existed but were not correctly configured
Thanks!