Search code examples
pythonapache-flinkgoogle-cloud-dataflowapache-beam

Assign PCollection back to global window


I have a pipeline that takes a bounded PCollection, assigns timestamps to it and "windows" it into Sliding Windows. After a grouping transform, I want to assign the resulting PCollection back to the global window. I have not been able to figure out how to do this. See sample beam pseudo-code below:

import apache_beam as beam

with beam.Pipeline() as p:
    (
        p
        | beam.io.ReadFromText()
        | beam.ParDo(AddTimestampDoFn())
        | beam.WindowInto(beam.window.SlidingWindows(60, 60))
        | beam.GroupByKey()
        | beam.ParDo(SomethingElse()
        | beam.WindowInto(GlobalWindow()) # Here is where I want to bring back to global window
    )

Any ideas on how to go about it?


Solution

  • Using beam.WindowInto(window.GlobalWindows()) should work. For example, with this quick test:

    data = [{'message': 'Hi', 'timestamp': time.time()}]
    
    events = (p
      | 'Create Events' >> beam.Create(data) \
      | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
      | 'Sliding Windows'   >> beam.WindowInto(beam.window.SlidingWindows(60, 60)) \
      | 'First window' >> beam.ParDo(DebugPrinterFn()) \
      | 'global Window'   >> beam.WindowInto(window.GlobalWindows()) \
      | 'Second window'   >> beam.ParDo(DebugPrinterFn()))
    

    where DebugPrinterFn prints window information:

    class DebugPrinterFn(beam.DoFn):
      """Just prints the element and window"""
      def process(self, element, window=beam.DoFn.WindowParam):
        logging.info("Received message %s in window=%s", element['message'], window)
        yield element
    

    I get the following output:

    INFO:root:Received message Hi in window=[1575565500.0, 1575565560.0)
    INFO:root:Received message Hi in window=GlobalWindow
    

    Tested with the DirectRunner and 2.16.0 SDK. If it does not work for you:

    • Do you get any error?
    • Which runner and SDK are you using?

    Full code here