Search code examples
apijobspipelinegoogle-cloud-dataflow

What is the correct way to get the result of Pipeline?


My code for handling my pipeline result looks like this (some snipped for brevity's sake):

PipelineResult result = pipeline.run();
switch (result.getState()) {
      case DONE: {
        handleDone();
        break;
      }
      case FAILED: {
        handleFailed();
        break;
      }
      case CANCELLED: {
        handleCancelled();
        break;   
      }
      case UNKNOWN:
      case RUNNING:
      case STOPPED: {
        handleUnknownRunningStopped();
        break;      
      }
      default: {
        assert false;
        throw new IllegalStateException();
     }
}

However, I've noticed that instead of returning a value of the enum PipelineResult.State for e.g. FAILED or CANCELLED, an exception is thrown:

  1. For a failed job, a DataflowJobExecutionException is thrown
  2. For a cancelled job, a DataflowJobCancelledException is thrown

What is the correct way (programmatically) to handle the result of a pipeline?


Solution

  • Both DataflowPipelineRunners return the PipelineResult that allows you to query the current status of the pipeline. The DataflowPipelineRunner returns a PipelineResult immediately upon job submission, but the BlockingDataflowPipelineRunner doesn't return it until the job has completed.

    In addition, the BlockingDataflowPipeline runner throws an exception if the job does not complete successfully -- since you've specified a blocking run(), we assume you want to know if something goes wrong. So if you've hard coded the Blocking Runner, then relying on an exception is an easy way to handle failure.

    Note that the code snippet you wrote uses the more general PipelineResult option, but won't work with the non-blocking runner, since that will return a result while the job is still running.