I'm trying to grab some data from BigQuery, update a couple of columns using Python, and then write it to another location. I'm using Apache Beam (via Google Dataflow) to do this.
Once I have the data from BigQuery as a PCollection, I want to convert it to a Beam Dataframe so I can update the relevant columns.
However, in order to do so, I need ensure the PCollection object is schema-aware. Even following the Beam documentation, I'm having trouble doing so.
I've tried following the pattern discussed in this post: Apache Beam infer schema using NamedTuple (Python) and here: https://beam.apache.org/documentation/programming-guide/#inferring-schemas
Schema Named Tuple Code:
class TestSchema(typing.NamedTuple):
entry_id: str
entry_name: int
user_id: str
user_entry: str
document: str
coders.registry.register_coder(TestSchema, coders.RowCoder)
Here's some test pipeline code in my run
function (some boilerplace ommitted):
with beam.Pipeline(argv=argv) as p:
pcoll = (p | 'read_bq_view' >>
beam.io.ReadFromBigQuery(query=BIGQUERY_SELECT_QUERY,use_standard_sql=True)
| "schematize" >> beam.Map(lambda x: TestSchema(**x)).with_output_types(TestSchema)
| beam.Map(print)
)
When I run this, I get:
NameError: name 'TestSchema' is not defined [while running 'schematize-ptransform-49']
What am I missing here? The class TestSchema exists and is in scope (it's at the top level of my file). I've tried using the traditional NamedTuple syntax in case I was messing up the class
syntax, but I get the same error.
According to the documentation, you can use beam.Row
in this case, example :
def test_pipeline(self):
with TestPipeline() as p:
inputs = {
'entry_id': 'ffff',
'entry_name': 4,
'user_id': 'ddddd',
'user_entry': 'dDDFFF',
'document': 'VVBBB'
}
entries = (p
| 'Inputs' >> beam.Create([inputs])
| 'ToRows' >> beam.Map(lambda user_entry: beam.Row(
entry_id=user_entry['entry_id'],
entry_name=user_entry['entry_name'],
user_id=user_entry['user_id'],
user_entry=user_entry['user_entry'],
document=user_entry['document']
)))
df = to_dataframe(entries)
# Deferred DataFrames can also be converted back to schema'd PCollections
collection = to_pcollection(df, include_indexes=False)
collection | 'Print' >> beam.Map(print)
BigQuery
beam.Row
in a Map
to add a schema to the PCollection
PCollection
to Dataframe