I am using a stateful and timely DoFn to process data 2 seconds after the end of the fixed window I am implementing.
I have tested a reproducible example of my code inside the Apache Beam playground. My data is in KV[str, str]
format as an input to the DoFn. The only difference I can think of between the playground and my code is that the playground uses a DirectRunner and I am using a DataflowRunner.
Before my DoFn I have another DoFn that mutates the input PCollection into the format the stateful DoFn is expecting:
class AddKeys(beam.DoFn):
def __init__(self, settings):
self.settings = settings
def process(self, element):
data = element["data"]
for setting in self.settings:
if setting["data"] == data:
yield [
("tuple key 1",
setting["tuple key 1"]),
("tuple key 2", setting["tuple key 2"]),
("tuple key 3", setting["tuple key 3"]),
("element", str(element))
]
Then my stateful DoFn should take the output and process it:
class ProcessCollection(beam.DoFn):
EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
BUFFER_STATE = BagStateSpec(
'buffer', ListCoder(StrUtf8Coder()))
def process(self, element,
timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
window=beam.DoFn.WindowParam,
buffer=beam.DoFn.StateParam(BUFFER_STATE)):
timer.set(window.end + Duration(seconds=2))
buffer.add(str(element))
@on_timer(EXPIRY_TIMER)
def expiry(self, buffer=beam.DoFn.StateParam(BUFFER_STATE)):
events = buffer.read()
for event in events:
yield ''.join(event)
buffer.clear()
Calling DoFn method:
# continue with processing & branch to stateful DoFn
extra_processing = (
raw_data_processing
| "Add Group Keys"
>> beam.Map(
lambda message: add_group_key(
message, SETTINGS)
)
| "Fixed Window"
>> beam.WindowInto(
window.FixedWindows(self.window_length),
# if message late by 700ms, still accept
allowed_lateness=window.Duration(seconds=0.7)
)
| "Group" >> beam.GroupByKey()
| "Process Further" >> beam.ParDo(OtherDoFn(SETTINGS, CONFIG))
)
# process data with stateful DoFn
(
extra_processing
| "Add Keys" >> beam.ParDo(AddKeys(SETTINGS)).with_output_types(KV[str, str])
| "Process Collection" >> beam.ParDo(ProcessCollection())
| 'Log' >> beam.LogElements(with_timestamp=True)
)
The error I receive from Google Cloud is:
File "/usr/local/lib/python3.10/site-packages/apache_beam/coders/coders.py", line 429, in encode
return value.encode('utf-8')
AttributeError: 'tuple' object has no attribute 'encode' [while running 'Add Keys-ptransform-51']
Full stack trace in pastebin due to size
Can anyone determine why this might be occurring?
You are right in your assumption why it does work with the DirectRunner, because this runner does not encode anything.
I am not familiar with KV[...]
, since I have never used that function and/or heard of it. So I am not sure what you are telling Beam to encode with this. Nevertheless, I encountered your error before and it is definitely caused by a missmatch of the argument within with_output_types
(or with_input_types
for that matter) and what you are actually returning.
In order to fix your problem, you would need to add the correct type hints. Thought, I am not sure what they are in your case, since you are returning a list of tuples with strings..? However, you can verify that this is indeed causing the issue, by starting the pipeline without the with_output_types
- then Dataflow should stop showing the error.
Edit:
Stateful DoFns work per key (and window). If you want to buffer (and eventually join - as defined in your ProcessCollection
) all of the incoming data, then you would need to create a dummy key. This can be anything (int, bool, str), you just have to make sure it is the same for every datapoint, e.g.
yield (
"my_dummy_key",
[
("tuple key 1", setting["tuple key 1"]),
("tuple key 2", setting["tuple key 2"]),
("tuple key 3", setting["tuple key 3"]),
("element", str(element))
]
)
By doing so, the whole list will always added to your buffer.
Alternatively, if you want to join only the settings which belong to a specific key, you would need to rewrite your AddKey
and make it dynamic
yield (
self.get_dynamic_key(), # the dynamic key method needs to be tailored to your use case / data
setting[self.get_dynamic_key()]
)
This results in n
different buffers (where n
is the number of dynamic keys) and only those belonging to the same key are then buffered and joined within ProcessCollection
.