I have a streaming pipeline in cloud-dataflow, where I set Metrics.counter
as below.
class SomeDoFn extends DoFn {
val validIdCounter = Metrics.counter("user-type", "valid_ids")
val invalidIdCounter = Metrics.counter("user-type", "invalid_ids")
@ProcessElement
def process(c: ProcessContext): Unit = {
val userId = getId(c.element) match {
case Success(id) =>
validIdCounter.inc()
Some(id)
case Failure(e) =>
invalidIdCounter.inc()
None
}
...
}
I'm able to see the metrics in Stackdriver monitoring and create alert for the same. But when I restart the pipeline, the metrics become zero. Is it the expected behaviour? Is there a way I can preserve the metrics across jobs and job runs?
The metric count resetting to zero is the expected behavior. To preserve the total metric count in Stackdriver, use count aggregation when charting the metric.