I am working on a simple Apache Beam pipeline using Python to process a text file and output a CSV. Below is my code:
python
Copy code
import apache_beam as beam
p1 = beam.Pipeline()
attendance_count = (
p1
| beam.io.ReadFromText("dept_data.txt", validate=True)
| beam.Map(lambda x: x.split(","))
| beam.Filter(lambda x: x[3] == "Accounts")
| beam.Map(lambda x: (x[1], 1))
| beam.CombinePerKey(sum)
| beam.Map(lambda x: f"{x[0]},{x[1]}")
| beam.io.WriteToCsv("output/dept_op_data.csv", num_shards=1)
)
p1.run()
When I try to run this pipeline, I get the following error:
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
Full Traceback:
Traceback (most recent call last):
File "/path/to/your/script.py", line 7, in <module>
p1
...
File "/opt/anaconda3/envs/beam/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 610, in schema_from_element_type
raise TypeError(
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
What I've Tried:
Question: How can I fix this issue? Do I need to make the pipeline schema-aware, or is there a simpler fix for this error? Any guidance would be greatly appreciated.
You should be able to change:
| beam.Map(lambda x: f"{x[0]},{x[1]}")
to
| beam.Map(lambda x: f"{x[0]},{x[1]}").with_output_types(str)
this error is coming because Beam is not able to automatically infer the output type of your map stage, so it is not able to convert it to a schema'd element.