Search code examples
scalaapache-sparkpipelinerddseq

Spark: How to transform a RDD to Seq to be used in pipeline


I want to use the implementation of pipeline in MLlib. Before, I had a RDD file and pass it to the model creation, but now to use pipeline, there should be sequence of LabeledDocument to be passed to the pipeline.

I have my RDD which is created as follows:

val data = sc.textFile("/test.csv");
val parsedData = data.map { line =>
        val parts = line.split(',')
        LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail))
        }.cache()

In the pipeline example Spark Programming Guide, the pipeline needs the following data:

// Prepare training documents, which are labeled.
val training = sparkContext.parallelize(Seq(
  LabeledDocument(0L, "a b c d e spark", 1.0),
  LabeledDocument(1L, "b d", 0.0),
  LabeledDocument(2L, "spark f g h", 1.0),
  LabeledDocument(3L, "hadoop mapreduce", 0.0),
  LabeledDocument(4L, "b spark who", 1.0),
  LabeledDocument(5L, "g d a y", 0.0),
  LabeledDocument(6L, "spark fly", 1.0),
  LabeledDocument(7L, "was mapreduce", 0.0),
  LabeledDocument(8L, "e spark program", 1.0),
  LabeledDocument(9L, "a e c l", 0.0),
  LabeledDocument(10L, "spark compile", 1.0),
  LabeledDocument(11L, "hadoop software", 0.0)))

I need a way to change my RDD (parsedData) to sequence of LabeledDocuments (like training in the example).

I appreciate your help.


Solution

  • I found an answer to this question.

    I can transform my RDD (parsedData) to SchemaRDD which is a sequnce of LabeledDocuments by the following code:

    val rddSchema = parsedData.toSchemaRDD;
    

    Now the problem is changed! I want to split the new rddSchema to training (80%) and test (20%). If I use randomSplit, it returns a Array[RDD[Row]] instead of SchemaRDD.

    New problem: How to transform Array[RDD[Row]] to SchemaRDD -- OR -- how to split SchemaRDD, in which the results be SchemaRDDs?