My problem is that the logs on the dataflow does not display anything (monitoring api is enabled) and I have no idea why.
With the following Apache Beam code (adopted from https://cloud.google.com/dataflow/docs/guides/logging),
import argparse
import logging
import re
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam import FlatMap, Map, Pipeline
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default="gs://dataflow-samples/shakespeare/kinglear.txt",
help="Input file to process.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with Pipeline(options=pipeline_options) as p:
filtered_words = (
p
| "Read" >> ReadFromText(known_args.input)
| "Split" >> FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x))
| "Log" >> Map(lambda x: logging.info(f"x: {x}"))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
Running locally with direct runner yields,
...
INFO:root:x: his
INFO:root:x: enemy
INFO:root:x: king
INFO:root:x: and
INFO:root:x: did
INFO:root:x: him
...
While running on Google Cloud Dataflow yields nothing.
Here is the dependencies,
python = "^3.8"
apache-beam = {extras = ["gcp"], version = "^2.28.0"}
Turn out that the default sink in Logs Router exclude the Dataflow log.
Creating a new sink in Logs Router with inclusion filter of resource.type="dataflow_step"
fixes the problem.