Search code examples
scalahadoopscalding

Scalding (older versions) counters based on cascading


In older versions of scalding there were still no counters introduced in its API. Hadoop Counters In Scalding suggests how to fallback to cascading counters in scalding

def addCounter(pipe : Pipe, group : String, counter : String) = {

  pipe.each(() -> ('addCounter)) ( fields =>
    new BaseOperation[Any](fields) with Function[Any] {

      def operate(flowProcess : FlowProcess[_], 
        functionCall : FunctionCall[Any]) {

          try {
            flowProcess.asInstanceOf[HadoopFlowProcess]
              .increment(group, counter, 1L)
            functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
          } catch {
            case cce: ClassCastException =>
            // HadoopFlowProcess is not available in local mode
          }
      }.discard('addCounter)
    }
  )
}

however when I tried that I get:

Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^

am i missing something? scalding version I use: 0.8.7


Solution

  • .discard is a scalding command and should therefore be at the same level as .each, the other scalding command in the code block. Try putting it after the last closing parenthesis ")". (The second last line in the code you have posted.)

    Here, operations are being chained to the RichPipe pipe, first the each, and then the discard:

    pipe.each(...){predicate}.discard(...)