Search code examples
apache-sparkapache-spark-sqlcountvectorizer

Spark - How to create a Spark dataframe that contains array of values in one of its columns for countVectorizer model


I am trying to execute Spark's countVectorizer model. As part of this requirement, I am reading a csv file and creating a Dataframe (inp_DF) out of it.

It has 3 columns as shown below

+--------------+--------+-------+
|         State|Zip Code|Country|
+--------------+--------+-------+
|      kentucky|   40205|     us|
|       indiana|   47305|     us|
|greater london|    sw15|     gb|
|    california|   92707|     us|
|      victoria|    3000|     au|
|         paris|   75001|     fr|
|      illinois|   60608|     us|
|     minnesota|   55405|     us|
|    california|   92688|     us|
+--------------+--------+-------+

I need to create a 4th column within the same dataframe that contains the array of values of all these 3 columns such as

|      kentucky|   40205|     us|   "kentucky","40205","us"
|       indiana|   47305|     us|   "indiana","47305","us"
|greater london|    sw15|     gb|   "greater london","sw15","gb"
|    california|   92707|     us|   "california","92707","us"
|      victoria|    3000|     au|   "victoria","3000","au"
|         paris|   75001|     fr|   "paris","75001","fr"
|      illinois|   60608|     us|   "illinois","60608","us"
|     minnesota|   55405|     us|   "minnesota","55405","us"
|    california|   92688|     us|   "california","92688","us"

Question 1 : Is there any easy command like .concat to achieve this ?

This array is needed because the input for countVectorizer model should be a column containing array of values. It should not be a string datatype as mentioned in the below error message :

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column State must be of type equal to one of the following types: [ArrayType(StringType,true), ArrayType(StringType,false)] but was actually of type StringType. at scala.Predef$.require(Predef.scala:224) at org.apache.spark.ml.util.SchemaUtils$.checkColumnTypes(SchemaUtils.scala:58) at org.apache.spark.ml.feature.CountVectorizerParams$class.validateAndTransformSchema(CountVectorizer.scala:75) at org.apache.spark.ml.feature.CountVectorizer.validateAndTransformSchema(CountVectorizer.scala:123) at org.apache.spark.ml.feature.CountVectorizer.transformSchema(CountVectorizer.scala:188) at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74) at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:155) at org.apache.spark.examples.ml.CountVectorizerExample$.main(CountVectorizerExample.scala:54) at org.apache.spark.examples.ml.CountVectorizerExample.main(CountVectorizerExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) Java HotSpot(TM) Client VM warning: ignoring option MaxPermSize=300m; support was removed in 8.0

I tried to create an array from those 3 columns of input dataframe but the array elements are enclosed within square brackets [ ].

Sample code snippet is given below for your reference

// Read Input Dataset for countVectorizer Logic
val inp_data = spark.read.format("com.databricks.spark.csv").option("header", "True").option("inferSchema", "true")
      .option("treatEmptyValuesAsNulls", "true").option("nullValue", "")
      .load("Input.csv")

// Creating a Spark Dataframe from the Input Data
val inp_DF = inp_data.toDF()

// Creating an array from Spark Dataframe Columns
val inp_array = inp_DF.select("State","Zip Code","Country").collect()
      println(inp_array.mkString(","))

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
      .setInputCol("State")
      .setOutputCol("features")
      .setVocabSize(4)
      .setMinDF(2)
      .fit(inp_DF)

Question 2 : How to remove square brackets [] from those array elements and create a new column in a dataframe with the values of the array ?

Question 3 : Can we supply single column value as input to the countVectorizer model and get features as output ?


Solution

  • you can use array function to create the array column as

    import org.apache.spark.sql.functions._
    val inp_array = inp_DF.withColumn("arrayColumn", array("State", "Zip Code", "Country"))
    

    which should give you output as

    +-------------+--------+-------+-------------------------+
    |State        |Zip Code|Country|arrayColumn              |
    +-------------+--------+-------+-------------------------+
    |kentucky     |40205   |us     |[kentucky, 40205, us]    |
    |indiana      |47305   |us     |[indiana, 47305, us]     |
    |greaterlondon|sw15    |gb     |[greaterlondon, sw15, gb]|
    |california   |92707   |us     |[california, 92707, us]  |
    |victoria     |3000    |au     |[victoria, 3000, au]     |
    |paris        |75001   |fr     |[paris, 75001, fr]       |
    |illinois     |60608   |us     |[illinois, 60608, us]    |
    |minnesota    |55405   |us     |[minnesota, 55405, us]   |
    |california   |92688   |us     |[california, 92688, us]  |
    +-------------+--------+-------+-------------------------+
    

    and you can use this dataframe in your CountVectorizerModel as

    val cvModel: CountVectorizerModel = new CountVectorizer()
      .setInputCol("arrayColumn")
      .setOutputCol("features")
      .setVocabSize(4)
      .setMinDF(2)
      .fit(inp_array)
    

    that answers your first two questions.

    Now answering your third question. YES you can use only one column in CountVectorizerModel but for that you would need to convert the column to ArrayType(StringType,true) which can be done by using the array function as above.

    Suppose you want to use State column in CountVectorizerModel. then you can change the datatype of State column to array by doing

    val single_arrayDF = inp_DF.withColumn("State", array("State"))
    

    and use it as

     val cvModel: CountVectorizerModel = new CountVectorizer()
      .setInputCol("State")
      .setOutputCol("features")
      .setVocabSize(4)
      .setMinDF(2)
      .fit(single_arrayDF)
    

    I hope the answer is helpful.