I have a streaming dataflow pipeline that writes to BQ, and I want to window all the failed rows and do some further analysis. The pipeline looks like this, I'm getting all the error messages in the 2nd step but all the messages are getting stuck to the beam.GroupByKey()
. Nothing moves downstream after that. Does anyone have any idea how to fix this?
data = (
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription,
with_attributes=True)
...
| "write to BQ" >> beam.io.WriteToBigQuery(
table=f"{options.bq_dataset}.{options.bq_table}",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='STREAMING_INSERTS',
insert_retry_strategy=beam.io.gcp.bigquery_tools.RetryStrategy.RETRY_NEVER
)
)
(
data[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| f"Window into: {options.window_size}m" >> GroupWindowsIntoBatches(options.window_size)
| f"Failed Rows for " >> beam.ParDo(BadRows(options.bq_dataset, 'table'))
)
and
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its publish timestamp.
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(10))
# If the windowed elements do not fit into memory please consider using `beam.util.BatchElements`.
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)
also, I don't know if it's relevant but the beam.DoFn.TimestampParam
inside my GroupWindowsIntoBatches
has invalid timestamp (negative)
Ok, so the issue was that the messages coming from BigQuery FAILED_ROWS were not timestamped. adding | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time()))
seems to fix the group by.
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, time.time())) <----- Added This line
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(30))
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)