Search code examples
pythongoogle-bigquerygoogle-cloud-dataflowapache-beam

Dynamically choose BigQuery tablename in Apache Beam pipeline


I'm building an Apache Beam pipeline using GCP Dataflow to process incoming events which need to be written in separate BigQuery tables depending on the content of the event. The decision of which table the data needs to be written to happens in one of the stages of the pipeline. My problem is how do I dynamicallys ett he name of the table that the data needs to go into. Also, in some cases, data needs to be written to two tables, after applying a transform.

I have gone through the solutions posted on these links, but it seems that they could be for old versions of google-cloud/apache-beam and are not working for me:

  1. Dynamically set bigquery table id in dataflow pipeline
  2. Writing different values to different BigQuery tables in Apache Beam

Attaching a sample pipeline using DirectRunner where I tried to follow the 2nd link mentioned above:

#Standard Python Imports
import argparse
import logging
import json
    
#3rd Party Imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.options.pipeline_options import PipelineOptions

def transform_entry(line):
    
    return json.loads(line)

def getTableName(entry):
    
    if (entry["tablename"] == "table1"):
        return "table1"
    else:
        return "table2"
    

def getRow(entry):
    return entry["dataRow"]

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument('--temp_location',
                        default='<<TEMPORARY LOCATION>>')

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(StandardOptions).streaming = True

    with beam.Pipeline(options=pipeline_options) as p:

        writeData = (p
                | 'ReadInput' >> beam.io.ReadFromText('./sample_input.json')
                | 'Parse'   >> beam.Map(transform_entry))

        eventRow = (writeData
                | 'Get Data Row' >> beam.map(getRow)
                | 'Write Event Row' >> beam.io.gcp.bigquery.WriteToBigQuery(
                        project='<<GCP PROJECT>>',
                        dataset='<<DATASET NAME>>',
                        table=getTableName,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                        ))

        print(eventRow)

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.ERROR)
    run()

Could someone please help me out with this?

Attaching the traceback here:

/home/animesh/.local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py:1992: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  is_streaming_pipeline = p.options.view_as(StandardOptions).streaming
<apache_beam.io.gcp.bigquery.WriteResult object at 0x7f2421660100>
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1571, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1521, in process
    yield (self.destination(element, *side_inputs), element)
  File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 85, in getTableName
KeyError: 'tablename'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 56, in <module>
    run()
  File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 37, in run
    with beam.Pipeline(options=pipeline_options) as p:
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
    self.result = self.run()
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 553, in run
    self._options).run(False)
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 201, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 222, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 453, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 781, in _execute_bundle
    self._run_bundle(
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1010, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1346, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
    return getattr(self, request_type)(
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 817, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1571, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1521, in process
    yield (self.destination(element, *side_inputs), element)
  File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 85, in getTableName
KeyError: "tablename [while running 'Write Event Row/_StreamToBigQuery/AppendDestination']"


Solution

  • Your function and the way to apply dynamic table name based on current element in the PCollection and a function are corrects, but you have a problem in the current element in your PCollection.

    You have a KeyError in the Dict inside your PCollection, the key tablename seems not to be present.

    You can add a mock instead of ReadFromText in order to be sure the expected this key is present and your input PCollection of Dict is created as expected : you can use beam.Create([{'field_name':'field_value'}]) for example.

    So you will test more easily the write to BQ part with dynamic table name.