Search code examples
apache-sparkcompositiontransformer-model

Create composite transformer spark


I am using an NGram Transformer then a CountVectorizerModel.

I need to be able to create a composite transformer for reuse later.

I was able to achieve this by making a List<Transformer> and looping through all elements but I want to know if it is possible to create a Transformer using 2 other Transformer


Solution

  • This is practically very easy, you'll just need to use the Pipeline API to create your pipeline :

    import java.util.Arrays;
    
    import org.apache.spark.ml.Pipeline;
    import org.apache.spark.ml.PipelineModel;
    import org.apache.spark.ml.PipelineStage;
    import org.apache.spark.ml.feature.CountVectorizer;
    import org.apache.spark.ml.feature.NGram;
    import org.apache.spark.ml.feature.Tokenizer;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.Metadata;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    List<Row> data = Arrays.asList(
                RowFactory.create(0, "Hi I heard about Spark"),
                RowFactory.create(1, "I wish Java could use case classes"),
                RowFactory.create(2, "Logistic,regression,models,are,neat")
        );
    
    StructType schema = new StructType(new StructField[]{
                new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
    });
    

    Now let's define our pipeline (tokenizer, ngram transformer and count vectorizer) :

    Tokenizer tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words");
    
    NGram ngramTransformer = NGram().setN(2).setInputCol("words").setOutputCol("ngrams");
    
    CountVectorizer countVectorizer = new CountVectorizer()
      .setInputCol("ngrams")
      .setOutputCol("feature")
      .setVocabSize(3)
      .setMinDF(2);
    

    We can now create the pipeline and train it :

    Pipeline pipeline = new Pipeline()
                .setStages(new PipelineStage[]{tokenizer, ngramTransformer, countVectorizer});
    
    // Fit the pipeline to training documents.
    PipelineModel model = pipeline.fit(sentenceDataFrame);
    

    I hope this helps