Writing to HBase in a Spark job: a conundrum with existential types

I am trying to write a Spark job that should put its output into HBase. AS far as I can tell, the right way to do this is to use the method saveAsHadoopDataset on org.apache.spark.rdd.PairRDDFunctions - this requires that my RDD is composed of pairs.

The method saveAsHadoopDataset requires a JobConf, and this is what I am trying to construct. According to this link, one thing I have to set on my JobConf is the output format (in fact it doens't work without), like


The problem is that apparently this does not compile, because TableOutputFormat is generic, even though it ignores its type parameter. So I have tried various combinations, such as


but in any case I get an error

required: Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]

Now, as far I can tell, Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]] translates to Class[T] forSome { type T <: org.apache.hadoop.mapred.OutputFormat[_, _] }. Here is where I think I have a problem, because:

  • Class is invariant
  • TableOutputFormat[T] <: OutputFormat[T, Mutation], but
  • I am not sure how existential types interact with subtyping in the requirement T <: OutputFormat[_, _]

Is there a way to obtain a subtype of OutputFormat[_, _] from TableOutputFormat? It seems the problem arises from the differences between generics in Java and in Scala - what can I do for this?


It turns out this is even subtler. I have tried to define myself a method in the REPL

def foo(x: Class[_ <: OutputFormat[_, _]]) = x

and I can actually invoke it with


or even


for that matters. But I cannot call


The original signature of setOutputFormat in Java is void setOutputFormat(Class<? extends OutputFormat> theClass). How can I call it from Scala?


  • That's very strange, are you 100% sure you have your imports correct (EDIT: yes, this was problem, see comments), and you have the correct versions of artefacts in your build file? Maybe it could help you if I provide a code snippet from my working project:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    val conf = HBaseConfiguration.create()
    val jobConfig: JobConf = new JobConf(conf, this.getClass)
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)

    and some deps I have:

    "org.apache.hadoop" % "hadoop-client" % "2.3.0-mr1-cdh5.0.0",
    "org.apache.hbase" % "hbase-client" % "", 
    "org.apache.hbase" % "hbase-common" % "", 
    "org.apache.hbase" % "hbase-hadoop-compat" % "",
    "org.apache.hbase" % "hbase-it" % "", /
    "org.apache.hbase" % "hbase-hadoop2-compat" % "",
    "org.apache.hbase" % "hbase-prefix-tree" % "", 
    "org.apache.hbase" % "hbase-protocol" % "", 
    "org.apache.hbase" % "hbase-server" % "",
    "org.apache.hbase" % "hbase-shell" % "", 
    "org.apache.hbase" % "hbase-testing-util" % "", 
    "org.apache.hbase" % "hbase-thrift" % "",