I'm writing a data validation script in Apache Beam. Whenever a new file is uploaded to Google Cloud Storage this script receives a message from PubSub, downloads the file, and runs a battery of predefined tests against the file. At the end of these tests I need to email a log of all the rows which failed their tests.
In order to not send the email multiple times I did some reading and believe that I can send the email once using state and timer constructs in Beam. However each file will have a different number of errors, so how do I set it such that the file send expects X elements, where each element is an error, before it sends the email, as opposed to a hard coded number.
I tried to use a DoFn with COUNT_STATE to count the elements that are passed to it but I get a different error regarding the element being a Pcollection and not a K, V tuple.
Here is the pipeline code:
with beam.Pipeline(options=pipeline_options) as p:
# Read Lines from data
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'Filter Messages' >> beam.ParDo(FilterMessageDoFn(known_args.project, t_options.dataset_id))
| 'After filter' >> beam.ParDo(DebugFn("DATA VALIDATION: PROCESSING FILE...", show_trace))
| 'Generate Schemas' >> beam.ParDo(GetSchemaFn(known_args.project, t_options.validation_home_path))
| 'After GetSschema' >> beam.ParDo(DebugFn("DATA VALIDATION: After OBTAINING SCHEMA...", show_trace))
| 'Validate' >> beam.ParDo(ValidateFn(known_args.project)).with_outputs(
ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_SUCCESS,
main='lines')
to_be_joined = ([validation[ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE],
validation[ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]]
| "Group By Key" >> beam.Flatten()
| 'Persist Global Errors to Big Query' >> beam.ParDo(PersistErrorsFn(known_args.project))
| 'Debug Errors' >> beam.ParDo(DebugFn("DATA VALIDATION: VALIDATION ERRORS", show_trace))
| 'Save Global Errors' >> beam.io.WriteToBigQuery('data_management.validation_errors',
project=known_args.project,
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
Basically I want to insert a step before writing to BigQuery to send out an email which only sends when VALIDATION_GLOBAL_FAILURE + VALIDATION_CONTENT_FAILURE number of errors have been received.
Thanks!
The idea is that you want to perform a CoGroupByKey
on the two PCollections
containing the validation failures and then apply a DoFn
that applies your email sending logic to the result.
It is unclear what the types within the pipeline are but I'm going to assume that ValidateFn
outputs a (file name, validation error)
tuple to ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE
and ValidateFn.TAG_VALIDATION_CONTENT_FAILURE
.
class SendEmail(beam.DoFn):
def process(self, element):
file_name = element[0]
iterable_of_global_failures = element[1].get(ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE)
iterable_of_content_failures = element[1].get(ValidateFn.TAG_VALIDATION_CONTENT_FAILURE)
... format and send e-mail if iterables satisfy requirements ...
# create a dict containing the tag to PCollection mapping for what we want to group together.
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'WindowInto' >> beam.WindowInto(FixedWindows(1))
| ...
validation_errors = {key: validation[key] for key in [ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE, ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]}
(validation_errors
| 'CoGroupByKey' >> beam.CoGroupByKey()
| 'Send Email' >> beam.ParDo(SendEmail())
Since each input record from PubsubIO represents the file name and this is later expanded into all relevant records, these records will all share the same timestamp of the PubsubIO message that the file is part of. This allows us to use a really small window size when grouping which leads to smaller groups and better performance. Specifying the WindowInto
is necessary so that we aren't using the GlobalWindow
because the CoGroupByKey
will never trigger output to happen. You can learn more about streaming, windowing and triggering[1, 2].