I'm using apache beam on Google Cloud Dataflow, and I'm trying to use a sliding window to output duplicate elements every 5 seconds, In preparation for a join.
def logger_helper(r):
logging.getLogger().info(r)
return r
window = (
records
| beam.Map(lambda r: (r['key'], r))
| "Page View Window " >> beam.WindowInto(beam.window.SlidingWindows(900, 5))
| "Print Page View Window" >> beam.Map(lambda r: logger_helper(r))
)
What I expected: a key would be logged every 5 seconds for 900 seconds (15 min) However, I only see a key logged once and never again. How do I get the same key to show up every 5 seconds?
Your windowing is not applied. According to the documentation
Window grouping occurs on an as-needed basis. If you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until GroupByKey or Combine aggregates across a window and key.
So try adding a GroupByKey
or Combine
step to your pipeline (before the print).