Search code examples
scalacascading

Compilation error when using Cascading 2.0 in scala


I am using cascading 2.0 in scala, and meet a weird compilation issue

This is main parts of the code

val sojSource = createSojSource(optionMap("input"))

val sinkScheme = createSojScheme(true)
val sink = new Hfs(sinkScheme, optionMap("output"), SinkMode.REPLACE)

var pipe = new Pipe("soj")
val each = new Each(pipe, new SojSampleFilter())

val flowDef = new FlowDef().addSource(pipe, sojSource).addTailSink(each, sink)// compile error

And here is the error message for the last line of code

type mismatch; found : cascading.tap.hadoop.Hfs required: cascading.tap.Tap[_, _, _] 
Note: org.apache.hadoop.mapred.JobConf <: Any (and cascading.tap.hadoop.Hfs <: cascading.tap.Tap[org.apache.hadoop.mapred.JobConf,org.apache.hadoop.mapred.RecordReader,org.apache.hadoop.mapred.OutputCollector]), but Java-defined class Tap is invariant in type Config. You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) 
Note: org.apache.hadoop.mapred.RecordReader <: Any (and cascading.tap.hadoop.Hfs <: cascading.tap.Tap[org.apache.hadoop.mapred.JobConf,org.apache.hadoop.mapred.RecordReader,org.apache.hadoop.mapred.OutputCollector]), but Java-defined class Tap is invariant in type Input. You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) 
Note: org.apache.hadoop.mapred.OutputCollector <: Any (and cascading.tap.hadoop.Hfs <: cascading.tap.Tap[org.apache.hadoop.mapred.JobConf,org.apache.hadoop.mapred.RecordReader,org.apache.hadoop.mapred.OutputCollector]), but Java-defined class Tap is invariant in type Output. You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) 

Could you, please, tell me how can I fix this and what is the reason?


Solution

  • I found this code in the scalding tree which works around this problem:

    // The scala compiler has problems with the generics in Cascading
    protected def castHfsTap(tap : Hfs) : Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
      tap.asInstanceOf[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]]
    }