Search code examples
pythonapache-beam

Apache Beam: TypeError: Could not determine schema for type hint Any


I am working on a simple Apache Beam pipeline using Python to process a text file and output a CSV. Below is my code:

python
Copy code
import apache_beam as beam

p1 = beam.Pipeline()

attendance_count = (
    p1
    | beam.io.ReadFromText("dept_data.txt", validate=True)
    | beam.Map(lambda x: x.split(","))
    | beam.Filter(lambda x: x[3] == "Accounts")
    | beam.Map(lambda x: (x[1], 1))
    | beam.CombinePerKey(sum)
    | beam.Map(lambda x: f"{x[0]},{x[1]}")
    | beam.io.WriteToCsv("output/dept_op_data.csv", num_shards=1)
)

p1.run()

When I try to run this pipeline, I get the following error:

TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
Full Traceback:
Traceback (most recent call last):
  File "/path/to/your/script.py", line 7, in <module>
    p1
  ...
  File "/opt/anaconda3/envs/beam/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 610, in schema_from_element_type
    raise TypeError(
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas

What I've Tried:

  1. Checked the validate=True in ReadFromText.
  2. Revisited my usage of Map and CombinePerKey.
  3. Looked into Beam's schema-aware transformations but couldn't figure out how to integrate them in my pipeline.

Question: How can I fix this issue? Do I need to make the pipeline schema-aware, or is there a simpler fix for this error? Any guidance would be greatly appreciated.


Solution

  • You should be able to change:

    | beam.Map(lambda x: f"{x[0]},{x[1]}")
    

    to

    | beam.Map(lambda x: f"{x[0]},{x[1]}").with_output_types(str)
    

    this error is coming because Beam is not able to automatically infer the output type of your map stage, so it is not able to convert it to a schema'd element.