Search code examples
scalahadoopcompressioncascadingscalding

Compress Output Scalding / Cascading TsvCompressed


So people have been having problems compressing the output of Scalding Jobs including myself. After googling I get the odd hiff of an answer in a some obscure forum somewhere but nothing suitable for peoples copy and paste needs.

I would like an output like Tsv, but writes compressed output.


Solution

  • Anyway after much faffification I managed to write a TsvCompressed output which seems to do the job (you still need to set the hadoop job system configuration properties, i.e. set compress to true, and set the codec to something sensible or it defaults to crappy deflate)

    import com.twitter.scalding._
    import cascading.tuple.Fields
    import cascading.scheme.local
    import cascading.scheme.hadoop.{TextLine, TextDelimited}
    import cascading.scheme.Scheme
    import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
    
    case class TsvCompressed(p: String) extends FixedPathSource(p) with DelimitedSchemeCompressed
    
    trait DelimitedSchemeCompressed extends Source {
      val types: Array[Class[_]] = null
    
      override def localScheme = new local.TextDelimited(Fields.ALL, false, false, "\t", types)
    
      override def hdfsScheme = {
        val temp = new TextDelimited(Fields.ALL, false, false, "\t", types)
        temp.setSinkCompression(TextLine.Compress.ENABLE)
        temp.asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
      }
    }