Search code examples
pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beamgoogle-cloud-datalab

How can I run a Dataflow job from Datalab in Python?


I'm having a bit of trouble running a Dataflow job from Datalab. What I could do with is a minimal working Python code example for this situation as this doesn't seem to be available from the Google Cloud Platform or Apache Beam documentation.

It would be really helpful for me to see some Python code that I can run from a Datalab cell that does the following.

# 1. Sets up the job

# 2. Defines the processing logic to be applied to the input data files

# 3. Saves the processed files to an output folder

# 4. Submits the job to Google Cloud Dataflow

To work this out, I have tried to play around with the word count examples from the Google and Apache docs and adapt them for use in Datalab. The code for this is as follows but it isn't clear to me what bits I can strip out to turn it into a truly minimal working example.

from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--input',
                      dest='input',
                      default='gs://data-analytics/kinglear.txt',
                      help='Input file to process.')
  parser.add_argument('--output',
                      dest='output',
                      default='gs://data-analytics/output',
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_args.extend([
      '--runner=DataflowRunner',
      '--project=project',
      '--staging_location=gs://staging',
      '--temp_location=gs://tmp',
      '--job_name=your-wordcount-job',
  ])

  # We use the save_main_session option because one or more DoFn's in this
  # workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

  with beam.Pipeline(options=pipeline_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | ReadFromText(known_args.input)

    # Count the occurrences of each word.
    counts = (
        lines
        | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
      (word, count) = word_count
      return '%s: %s' % (word, count)

    output = counts | 'Format' >> beam.Map(format_result)

    # Write the output using a "Write" transform that has side effects.
    output | WriteToText(known_args.output)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Thanks in advance!

Josh


Solution

  • I worked out this problem with the help of the tutorials here: https://github.com/hayatoy/dataflow-tutorial and can now launch a Dataflow job from Datalab with the following code.

    import apache_beam as beam
    
    # Pipeline options:
    options                         = beam.options.pipeline_options.PipelineOptions()
    gcloud_options                  = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
    gcloud_options.job_name         = 'test'
    gcloud_options.project          = 'project'
    gcloud_options.staging_location = 'gs://staging'
    gcloud_options.temp_location    = 'gs://tmp'
    gcloud_options.region           = 'europe-west2'
    
    # Worker options:
    worker_options                  = options.view_as(beam.options.pipeline_options.WorkerOptions)
    worker_options.disk_size_gb     = 30
    worker_options.max_num_workers  = 10
    
    # Standard options:
    options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
    
    # Pipeline:
    PL = beam.Pipeline(options=options)
    
    (
          PL | 'read'  >> beam.io.ReadFromText('gs://input.txt')
             | 'write' >> beam.io.WriteToText ('gs://output.txt', num_shards=1)
    )
    
    PL.run()
    

    Thanks,

    Josh