Search code examples
scalaapache-sparkapache-spark-mllibpipelinetransformer-model

java.lang.NoSuchMethodException: <Class>.<init>(java.lang.String) when copying custom Transformer


Currently playing with custom tranformers in my spark-shell using both spark 2.0.1 and 2.2.1.

While writing a custom ml transformer, in order to add it to a pipeline, I noticed that there is an issue with the override of the copy method.

The copy method is called by the fit method of the TrainValidationSplit in my case.

The error I get :

java.lang.NoSuchMethodException: Custom.<init>(java.lang.String)
  at java.lang.Class.getConstructor0(Class.java:3082)
  at java.lang.Class.getConstructor(Class.java:1825)
  at org.apache.spark.ml.param.Params$class.defaultCopy(params.scala:718)
  at org.apache.spark.ml.PipelineStage.defaultCopy(Pipeline.scala:42)
  at Custom.copy(<console>:16)
  ... 48 elided

I then tried to directly call the copy method but I still get the same error.

Here is myclass and the call I perform :

import org.apache.spark.ml.Transformer
import org.apache.spark.sql.{Dataset, DataFrame}
import org.apache.spark.sql.types.{StructField, StructType, DataTypes}
import org.apache.spark.ml.param.{Param, ParamMap}

// Simple DF
val doubles = Seq((0, 5d, 100d), (1, 4d,500d), (2, 9d,700d)).toDF("id", "rating","views")


class Custom(override val uid: String) extends org.apache.spark.ml.Transformer {
  def this() = this(org.apache.spark.ml.util.Identifiable.randomUID("custom"))

  def copy(extra: org.apache.spark.ml.param.ParamMap): Custom = {
    defaultCopy(extra)
  }

  override def transformSchema(schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.types.StructType = {
    schema.add(org.apache.spark.sql.types.StructField("trending", org.apache.spark.sql.types.IntegerType, false))
  }

   def transform(df: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {

    df.withColumn("trending", (df.col("rating") > 4 && df.col("views") > 40))
  }
}


val mycustom = new Custom("Custom")
// This call throws the exception. 
mycustom.copy(new org.apache.spark.ml.param.ParamMap())

Does anyone know if this is a known issue ? I cant seem to find it anywhere.

Is there another way to implement the copy method in a custom transformer ?

Thanks


Solution

  • These are a couple of things that I would change about your custom Transformer (also to enable SerDe operations of your PipelineModel):

    e.g.

    class Custom(override val uid: String) extends Transformer
        with DefaultParamsWritable {
          ...
          ...
    
    }
    object Custom extends DefaultParamsReadable[Custom]
    

    Do take a look at the UnaryTransformer if you have only 1 Input/Output columns.

    Finally, what's the need to call mycustom.copy(new ParamMap()) exactly??