Search code examples
pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beamwindowing

How to calculate the number of elements of a PCollection in Apache beam


number_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows()) \
    | 'CountGlobally' >> beam.combiners.Count.Globally() \
    | 'print' >> beam.ParDo(PrintFn())

I tried to display that via prints and logs but I found nothing

class PrintFn(beam.DoFn):
    def process(self, element):
        print(element)
        logging.error(element)
        return [element]

Solution

  • For Batch, you can simply do

    def print_row(element):
      print element
    
    count_pcol = (
                  lines
                  | 'Count elements' >> beam.combiners.Count.Globally()
                  | 'Print result' >> beam.Map(print_row)
                )
    

    beam.combiners.Count.Globally() is a PTransform that uses global combine to count all the elements of a PCollection and produce a single value.


    For Streaming, counting elements is not possible because the source is an unbounded pcollection i.e. it never ends. CombineGlobally in your case will keep on waiting for the input and never produce an output.

    A possible solution could be to set a window function and a non-default trigger.

    I have written a simple pipeline that divides elements in fixed windows of 20 seconds and counts per key for each window. You can change window and trigger based on your requirements.

    def form_pair(data):
      return 1, data
    
    def print_row(element):
          print element
    
    count_pcol = (
                    p 
                    | 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                    | 'Form key value pair' >> beam.Map(form_pair)
                    | 'Apply windowing and triggers' >> 
                                           beam.WindowInto(window.FixedWindows(20),
                                           trigger=AfterProcessingTime(5), 
                                           accumulation_mode=AccumulationMode.DISCARDING)
                    | 'Count elements by key' >> beam.combiners.Count.PerKey()
                    | 'Print result' >> beam.Map(print_row)
                   )