My code for handling my pipeline result looks like this (some snipped for brevity's sake):
PipelineResult result = pipeline.run();
switch (result.getState()) {
case DONE: {
handleDone();
break;
}
case FAILED: {
handleFailed();
break;
}
case CANCELLED: {
handleCancelled();
break;
}
case UNKNOWN:
case RUNNING:
case STOPPED: {
handleUnknownRunningStopped();
break;
}
default: {
assert false;
throw new IllegalStateException();
}
}
However, I've noticed that instead of returning a value of the enum PipelineResult.State
for e.g. FAILED
or CANCELLED
, an exception is thrown:
DataflowJobExecutionException
is thrown DataflowJobCancelledException
is thrownWhat is the correct way (programmatically) to handle the result of a pipeline?
Both DataflowPipelineRunners return the PipelineResult that allows you to query the current status of the pipeline. The DataflowPipelineRunner returns a PipelineResult immediately upon job submission, but the BlockingDataflowPipelineRunner doesn't return it until the job has completed.
In addition, the BlockingDataflowPipeline runner throws an exception if the job does not complete successfully -- since you've specified a blocking run(), we assume you want to know if something goes wrong. So if you've hard coded the Blocking Runner, then relying on an exception is an easy way to handle failure.
Note that the code snippet you wrote uses the more general PipelineResult option, but won't work with the non-blocking runner, since that will return a result while the job is still running.