Search code examples
pythoncsvapache-sparkpysparkpca

Pyspark Generic model for PCA data processing


I'm working on data analysing using PCA, I wrote this code with PySpark and it work perfectly but it works only on data read from a csv file with exacly 5 columns ["a","b","c","d","e"], I whant to write a generic code that calculate PCA for whatever number of columns read from the csv file. What should I add ? Here is my code:

#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("PCAExample")\
        .getOrCreate()  
    data = sc.textFile('dataset.csv') \
        .map(lambda line: [float(k) for k in line.split(';')])\
        .collect()
    df = spark.createDataFrame(data, ["a","b","c","d","e"])
    df.show()
    vecAssembler = VectorAssembler(inputCols=["a","b","c","d","e"], outputCol="features")

    pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
    pipeline = Pipeline(stages=[vecAssembler, pca]
    model = pipeline.fit(df)
    result = model.transform(df).select("pcaFeatures")
    result.show(truncate=False))
    spark.stop()

Solution

  • You need to make your code generic by changing few lines:-

    fileObj = sc.textFile('dataset.csv') 
    data = fileObj.map(lambda line: [float(k) for k in line.split(';')]).collect()
    columns = (fileObj.first()).split()
    df = spark.createDataFrame(data, columns)
    df.show()
    vecAssembler = VectorAssembler(inputCols=columns, outputCol="features")