Search code examples
apache-sparkgoogle-cloud-storagegoogle-cloud-dataproc

What is the best way to wait for a Google Dataproc SparkJob in Java?


I am currently triggering a Spark Job via a Spring REST service using Dataproc's Java Client API. The basics of the spark job are :

  1. Initialize Spark
  2. Process data
  3. Store results into a GS bucket .json file

The reason why I store the data is so that when my Spark Job is done and has stored the results in the JSON file, I can read the stored results from the REST service. However, Dataproc's Java Client API simply triggers the job and does not wait for the job to finish. Thus, what's the best way to wait for the spark job to finish? I do not want to use Object.wait(int time) because different spark jobs will have different execution times.


Solution

  • Through the Dataproc REST API, calling GET on the job will return information about the job status. In general, you can simply have a polling loop:

    public static final ImmutableSet<String> TERMINAL_JOB_STATES =
        ImmutableSet.of("CANCELLED", "DONE", "ERROR");
    
    // Initialize this as normal with credentials, setAppName, HttpTransport, etc.
    private Dataproc dataproc;
    
    public void waitJob(String projectId, String jobId) throws IOException, InterruptedException {
      Job job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
      while (!TERMINAL_JOB_STATES.contains(job.getStatus().getState())) {
        System.out.println("Job not done yet; current state: " + job.getStatus().getState());
        Thread.sleep(5000);
        job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
      }
      System.out.println("Job terminated in state: " + job.getStatus().getState());
    }
    

    You may also want to wrap the .execute() calls inside try/catch statements cathing IOException in case the error is some kind of transient network-connection error (any 500 HTTP code errors should just be retried). You may also want a maximum wait time in case something's blocking the job from ever completing, or you're inadvertently retrying on a 404 not found error.

    You should also be able to detect 404 not found errors from any thrown IOException; this would happen if you accidentally went in and deleted a job before the polling completed, or if a bug causes you to enter the waitJob call despite a failed SubmitJob call. You should be able to experiment with trying to GET a nonexistent job and looking at what the error looks like to avoid infinite-looping in such a case.