I am quite new to apache beam and I am wondering how to infer schema to a pcollection using namedtuple.
The example from the documentation Programming Guide states:
class Transaction(typing.NamedTuple):
bank: str
purchase_amount: float
pc = input | beam.Map(lambda ...).with_output_types(Transaction)
I tried to implement similar thing but reading from a parquet file first
from apache_beam import coders
from typing import NamedTuple
import apache_beam as beam
class TestSchema(NamedTuple):
company_id: int
is_company: bool
company_created_datetime: str
company_values: str
if __name__ == '__main__':
coders.registry.register_coder(TestSchema, coders.RowCoder)
with beam.Pipeline() as pipeline:
record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet").with_output_types(TestSchema) \
| "Print" >> beam.Map(print)
pipeline.run().wait_until_finish()
And I am getting AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']
Also without the .with_output_types(TestSchema)
I can see the data fine which looks like this
{'company_id': 3, 'is_company': True, 'company_created_datetime': datetime.datetime(2022, 3, 8, 13, 2, 26, 573511), 'company_values': 'test value'}
I am using python 3.8 and beam 2.37.0
Am I missing something? any help would be appreciated (stack trace below).
Traceback (most recent call last):
File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 817, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 826, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1206, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 698, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 836, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 841, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 1610, in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']
Ok after some research on beam schema and digging in the source code I finally found the solution. It looks like you need to convert every single value in the pcollection to NamedTuple and later apply a type hint.
with beam.Pipeline() as pipeline:
record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet") \
| "Transform to NamedTuple" beam.Map(lambda x: TestSchema(**x)).with_output_types(TestSchema) \
| "Print" >> beam.Map(print)