Search code examples
google-cloud-platformgoogle-cloud-functionsgoogle-cloud-storagegoogle-cloud-dataflowapache-beam

Dataflow Job is failing within 40 Seconds


I have a simple google could http trigger function which is responsible for triggering Dataflow runner job that loads data from CSV on Cloud Storage to a BigQuery table.

My code looks is given below:-

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import SetupOptions, PipelineOptions

PROJECT = 'proj'
BUCKET='BUCKET'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT,ibu:STRING,brewery_id:STRING'
DATAFLOW_JOB_NAME = 'jobname'


def execute(request):
    argv = [
      '--project={0}'.format(PROJECT),
      '--job_name={0}'.format(DATAFLOW_JOB_NAME),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--region=europe-west2',
      '--runner=DataflowRunner'
   ]

    #p = beam.Pipeline(argv=argv)
    pipeline_options = PipelineOptions(argv)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)
    input = 'gs://{0}/beers.csv'.format(BUCKET)
    print ('step-222')

    (p | 'ReadData' >> beam.io.ReadFromText(input, skip_header_lines =1)
       | 'SplitData' >> beam.Map(lambda x: x.split(','))
       | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) 
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table='data',
           dataset='sandbox',
           project=PROJECT
           schema=SCHEMA,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
           ))
    p.run()
    return "success"

Function runs successfully and it also creates a Dataflow instance, but Dataflow instance fails with in 40 seconds without creating Graph-View. enter image description here It is giving error :- enter image description here


Solution

  • As @captainnabla said in his comment, you have to create a subnetwork and give it as option to your Dataflow job.

    • Solution 1

    In the default VPC of the project, create the subnetwork for Dataflow

    If you didn’t specified the subnetwork, usually the project default VPC network will be used by the Dataflow job. I don’t know why this didn’t worked in your case (maybe in this case, the default network taken by the job is outside of the project executing the job).

    • Solution 2

    Create another VPC for your data pipelines and a subnetwork for Dataflow

    The network config depends on your team strategy.

    In the two solutions, you can pass the subnetwork as program argument to your Dataflow job :

    --subnetwork=https://www.googleapis.com/compute/v1/projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{SUBNETWORK}