Search code examples
dataframegoogle-bigqueryschemagoogle-cloud-dataflowapache-beam

Apache Beam PCollection Schema (converting to Dataframe)


I'm trying to grab some data from BigQuery, update a couple of columns using Python, and then write it to another location. I'm using Apache Beam (via Google Dataflow) to do this.

Once I have the data from BigQuery as a PCollection, I want to convert it to a Beam Dataframe so I can update the relevant columns.

However, in order to do so, I need ensure the PCollection object is schema-aware. Even following the Beam documentation, I'm having trouble doing so.

I've tried following the pattern discussed in this post: Apache Beam infer schema using NamedTuple (Python) and here: https://beam.apache.org/documentation/programming-guide/#inferring-schemas

Schema Named Tuple Code:

class TestSchema(typing.NamedTuple):
  entry_id: str
  entry_name: int
  user_id: str
  user_entry: str
  document: str

coders.registry.register_coder(TestSchema, coders.RowCoder)

Here's some test pipeline code in my run function (some boilerplace ommitted):

with beam.Pipeline(argv=argv) as p:
        pcoll = (p | 'read_bq_view' >> 
        beam.io.ReadFromBigQuery(query=BIGQUERY_SELECT_QUERY,use_standard_sql=True) 
        | "schematize" >> beam.Map(lambda x: TestSchema(**x)).with_output_types(TestSchema)
        | beam.Map(print)
        )

When I run this, I get:

NameError: name 'TestSchema' is not defined [while running 'schematize-ptransform-49']

What am I missing here? The class TestSchema exists and is in scope (it's at the top level of my file). I've tried using the traditional NamedTuple syntax in case I was messing up the class syntax, but I get the same error.


Solution

  • According to the documentation, you can use beam.Row in this case, example :

    def test_pipeline(self):
        with TestPipeline() as p:
            inputs = {
                'entry_id': 'ffff',
                'entry_name': 4,
                'user_id': 'ddddd',
                'user_entry': 'dDDFFF',
                'document': 'VVBBB'
            }
    
            entries = (p
                       | 'Inputs' >> beam.Create([inputs])
                       | 'ToRows' >> beam.Map(lambda user_entry: beam.Row(
                        entry_id=user_entry['entry_id'],
                        entry_name=user_entry['entry_name'],
                        user_id=user_entry['user_id'],
                        user_entry=user_entry['user_entry'],
                        document=user_entry['document']
                    )))
    
            df = to_dataframe(entries)
    
            # Deferred DataFrames can also be converted back to schema'd PCollections
            collection = to_pcollection(df, include_indexes=False)
    
            collection | 'Print' >> beam.Map(print)
    
    • I created a mock for input to simulate the data retrieved from BigQuery
    • I used beam.Row in a Map to add a schema to the PCollection
    • Convert the PCollection to Dataframe