Search code examples
scalaapache-sparkpysparkapache-spark-sqlapache-spark-ml

Should we parallelize a DataFrame like we parallelize a Seq before training


Consider the code given here,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

Assuming we read "training" as a dataframe using sqlContext.read(), should we still do something like

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

or the fit function will automatically take care of parallelizing the computation/ data when passed a dataFrame

Regards,


Solution

  • DataFrame is a distributed data structure. It is neither required nor possible to parallelize it. SparkConext.parallelize method is used only to distributed local data structures which reside in the driver memory. You shouldn't be used to distributed large datasets not to mention redistributing RDDs or higher level data structures (like you do in your previous question)

    sc.parallelize(trainingData.collect()) 
    

    If you want to convert between RDD / Dataframe (Dataset) use methods which are designed to do it:

    1. from DataFrame to RDD:

      import org.apache.spark.sql.DataFrame
      import org.apache.spark.sql.Row
      import org.apache.spark.rdd.RDD
      
      val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
      val rdd: RDD[Row] = df.rdd
      
    2. form RDD to DataFrame:

      val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
      val df1: DataFrame = rdd.toDF
      // or
      val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext