I have a stateful DoFn with a beam.pvalue.AsSingleton
side input. When I was trying to write a test for this DoFn, I've noticed a strange behavior: Sometimes the execution fails with an ValueError
stating that the stateful DoFn requires a key-value pair as input, i.e.
Traceback (most recent call last):
File "/opt/playground/backend/executable_files/afa71397-d19b-4088-8d4b-85d16735631e/afa71397-d19b-4088-8d4b-85d16735631e.py", line 38, in <module>
dot = pipeline_graph.PipelineGraph(pipeline).get_dot()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/interactive/display/pipeline_graph.py", line 78, in __init__
pipeline, pipeline._options)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_instrument.py", line 72, in __init__
pipeline.to_runner_api(), pipeline.runner, options)
File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 913, in to_runner_api
root_transform_id = context.transforms.get_id(self._root_transform())
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", line 104, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1304, in to_runner_api
for part in self.parts
File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1304, in <listcomp>
for part in self.parts
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", line 104, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1291, in to_runner_api
transform_spec = transform_to_runner_api(self.transform, context)
File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1286, in transform_to_runner_api
named_inputs=self.named_inputs())
File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 742, in to_runner_api
urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs) # type: ignore[call-arg]
File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1429, in to_runner_api_parameter
extra_kwargs.get('named_inputs', None))
File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1393, in _get_key_and_window_coder
'key-value pairs.' % self)
ValueError: Input elements to the transform <ParDo(PTransform) label=[ParDo(StatefulWithSideInput)] side_inputs=[AsSingleton(PCollection[side/Map(decode).None])]> with stateful DoFn must be key-value pairs.
which is quite confusing, since I clearly provide a key-value input - only my side input has no key-value pair. The actual logic within the DoFn seems to be irrelevant, since this MWE causes the same issue
import apache_beam as beam
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.coders import StrUtf8Coder
class StatefulWithSideInput(beam.DoFn):
MYSTATE = ReadModifyWriteStateSpec('my_state', StrUtf8Coder())
def process(self,
element=beam.DoFn.ElementParam,
side=beam.DoFn.SideInputParam,
state=beam.DoFn.StateParam(MYSTATE)):
yield element[1]
with beam.Pipeline() as pipeline:
side_input = (
pipeline
| "side" >> beam.Create(['test'])
)
main = (
pipeline
| "main" >> beam.Create([('1', 2)])
)
(
main
| beam.ParDo(StatefulWithSideInput(), beam.pvalue.AsSingleton(side_input))
| beam.Map(print)
)
Most of the time the code works without any errors and returns the expected result 2
, but sporadically I get the ValueError
. My actual DoFn is more complex (e.g., uses the side input and the state variable) and runs perfectly fine on Dataflow (no errors there). The first time I've encountered this issue is when I was trying to write a test for this DoFn.
Is this a DirectRunner issue/bug? Do I do something wrong when providing input for StatefulWithSideInput
? But then again, why it is most of the time working?
I've finally come up with a solution. While it might not fix the underlying issue (if there is one with the DirectRunner), at least the unit tests can be run consistently without any weird errors.
You need to tell Beam
the output type (depending on your specific case) of your side input, i.e.
side_input = (
pipeline
| "side" >> beam.Create(['test']).with_output_types('str')
)
See here the full example, runable in apache beam play