Search code examples
scalahadoopcascadingscalding

how to perform an operation one time only at the end of a scalding job?


I read in scalding groupAll docs:

   /**
    * Group all tuples down to one reducer.
    * (due to cascading limitation).
    * This is probably only useful just before setting a tail such as Database
    * tail, so that only one reducer talks to the DB.  Kind of a hack.
    */
    def groupAll: Pipe = groupAll { _.pass }

This gave me good reasons to believe that if I pipe my end write result into a statusUpdater pipe which just updates some database that my job finished successfully then it would be performed once after the job completes, however I tried it in

The following code example:

import Dsl._
somepipe
  .addCount
  .toPipe(outputSchema)
  .write(Tsv(outputPath, outputSchema, writeHeader = true))(flowDef, mode)
  .groupAll.updateResultStatus

  implicit class StatusResultsUpdater(pipe: Pipe) {
    def updateResultStatus: Pipe = {
      println("DO THIS ONCE AFTER JOB COMPLETES!") // was printed even before the job ended! how to have it print only when job ends!?
      pipe
    }
  }

according to docs as I used groupAll then updateResultStatus should be run only after job ends and only once, why do I see it print the statement already before job ends? am i missing something? what should I do so it works?


Solution

  • The execution order in Scalding job is a bit tricky:

    1. The initializer statements in the Job class are executed and operation tree is built (that connects Pipes, Taps etc.)
    2. The tree is handed off to the optimizer. The execution plan is created
    3. The job starts executing. Hadoop jobs' Map and Reduce steps are kicked off according to the plan
    4. The main program waits for everything to complete and exits.

    According to your code, the println statement will execute on step 1.