number_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows()) \
| 'CountGlobally' >> beam.combiners.Count.Globally() \
| 'print' >> beam.ParDo(PrintFn())
I tried to display that via prints and logs but I found nothing
class PrintFn(beam.DoFn):
def process(self, element):
print(element)
logging.error(element)
return [element]
For Batch, you can simply do
def print_row(element):
print element
count_pcol = (
lines
| 'Count elements' >> beam.combiners.Count.Globally()
| 'Print result' >> beam.Map(print_row)
)
beam.combiners.Count.Globally() is a PTransform that uses global combine to count all the elements of a PCollection and produce a single value.
For Streaming, counting elements is not possible because the source is an unbounded pcollection i.e. it never ends. CombineGlobally in your case will keep on waiting for the input and never produce an output.
A possible solution could be to set a window function and a non-default trigger.
I have written a simple pipeline that divides elements in fixed windows of 20 seconds and counts per key for each window. You can change window and trigger based on your requirements.
def form_pair(data):
return 1, data
def print_row(element):
print element
count_pcol = (
p
| 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Form key value pair' >> beam.Map(form_pair)
| 'Apply windowing and triggers' >>
beam.WindowInto(window.FixedWindows(20),
trigger=AfterProcessingTime(5),
accumulation_mode=AccumulationMode.DISCARDING)
| 'Count elements by key' >> beam.combiners.Count.PerKey()
| 'Print result' >> beam.Map(print_row)
)