Search code examples
scalaapache-sparkpcaapache-spark-ml

Select most important variables in terms of their contributions to PCA in Spark


I am conducting Principal Component Analysis in Spark Scala.

My output only display the principal component score vectors. But (1) how can I get displayed the principal component loadings and (2) select the variables that contribute to the most extent to the 1st and 2nd components(i.e., have particularly high loadings on the components). I suggest to look at the first two components as they usually capture the most variance.

Any help & advice appreciated!

What I implemented so far:

// import org.apache.spark.ml.feature.{PCA, VectorAssembler, StandardScaler}
// import org.apache.spark.ml.linalg.Vectors

   val dataset = (spark.createDataFrame(
   Seq((1.2, 1.3, 1.7, 1.9), (2.2, 2.3, 2.7, 2.9), (3.2, 3.3, 3.5, 3.7))
   ).toDF("f1", "f2", "f3", "f4"))    

   val assembler = new VectorAssembler()
   .setInputCols(Array("f1", "f2", "f3", "f4"))
   .setOutputCol("features")

   val output = assembler.transform(dataset) 

 // Normalizing each feature to have a zero mean

   val scaler = new StandardScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .setWithStd(false)
    .setWithMean(true)

  val scalerModel = scaler.fit(output)

  val scaledData = scalerModel.transform(output)


 // Compute PCA on a vector of features 

// Display the principal component score vectors

 val pca = new PCA()
  .setInputCol("scaledFeatures")
  .setOutputCol("pcaFeatures")
  .setK(2)
  .fit(scaledData)

 // ? How to extract the variables contributing most to principal components?

Solution

  • I have implemented a class in python which computes primarily everything related to principal components in pyspark. I recognize that your code is in scala but you can have a look if it is of any help. The implementation of the class is as follows:

    from pyspark import SparkConf,SparkContext
    from pyspark.sql.context import HiveContext
    from pyspark.sql import Row
    from pyspark.ml.feature import VectorAssembler,PCA,StandardScaler
    from pyspark.ml.clustering import KMeans
    from pyspark.ml.param import Param,Params
    from pyspark.sql.types import *
    from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
    from pyspark.mllib.linalg.distributed import RowMatrix
    from pyspark.mllib.linalg import _convert_to_vector, Vectors, Matrix, DenseMatrix
    from pyspark.sql.functions import array, col, explode, struct, lit, udf, sum, when,rowNumber,avg,pow,sqrt,mean,log,desc
    from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
    from pyspark.sql.window import Window
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.regression import LinearRegression
    import time, os, sys, json, math
    import datetime as dt
    import subprocess
    import getpass
    import pdb
    import csv
    import pandas as pd
    import numpy as np
    from copy import copy
    from numpy.linalg import eigh
    import itertools
    
    class princomp:
        def __init__(self,n=5,std=True,prefix='pcomp'):
            self.n=n
            self.std=std
            self.prefix=prefix
            self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
            self.inputcol = "std_features"
            self.outputcol = "pca_features"
            self.vars = 'all'
            self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
            self.components = sqlContext.createDataFrame(pd.DataFrame([0]))
            self.projections = sqlContext.createDataFrame(pd.DataFrame([0]))
            self.result = sqlContext.createDataFrame(pd.DataFrame([0]))
        def helpme(self):
            print ('|-- Please note that output_parameters will have null values before calling the fit method')
            print ('|-- n : input_parameter : sets the number of principal components, default = 5')
            print ('|-- std : input_parameter : True/False value specifying whether to standardize the principal components, default = True')
            print ('|-- prefix : input_parameter : string specifying the prefix for columns of principal components , default = pcomp')
            print ('|-- vars : input_parameter : list of variable names to be used for principal components, default = all')
            print ('|-- components : output_parameter : pandas dataframe of coefficients of different input columns for computing principal components')
            print ('|-- result : output_parameter : spark dataframe of original variables joined with projections of principal components')
            print ('|-- covariance : output_parameter : Numpy array of the covariance matrix')
            print ('|-- eigvals : output_parameter : Numpy array of all eigenvalues')
            print ('|-- eigvecs : output_parameter : Numpy array of all eigenvectors')
            print ('|-- varianceexplained : output_parameter : variance explained by the n principal components')
            print ('|-- outputcompfile(file) : method : outputs the components matrix to the specified file')
            print ('|-- fit(inputdf,myfeatures) : method : fit method which computes all output parameters')
        # SET methods
        def setn(self,val):
            self.n = val
            self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
            self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
        def setstd(self,val):
            self.std = val
        def setprefix(self,val):
            self.prefix=val
            self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
        def setcols(self,val):
            self.cols = val
        def setinputcol(self,val):
            self.inputcol = val
            self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
        def setoutputcol(self,val):
            self.outputcol = val
            self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
        def setvars(self,val):
            self.vars = val
        def setmodel(self,val):
            self.model = val
        def setcomponents(self,val):
            self.components=val
        def setprojections(self,val):
            self.projections=val
        def setresult(self,val):
            self.result = val
        # GET methods
        def getn(self):
            return self.n
        def getstd(self):
            return self.std
        def getprefix(self):
            return self.prefix
        def getcols(self):
            return self.cols
        def getinputcol(self):
            return self.inputcol
        def getoutputcol(self):
            return self.outputcol
        def getvars(self):
            return self.vars
        def getmodel(self):
            return self.model
        def getcomponents(self):
            return self.components
        def getprojections(self):
            return self.projections
        def getresult(self):
            return self.result
        # CORE methods
        def vectorizedf(inputdf,vars='all'):
            """Returns the input spark dataframe with an additional column of dense vector features"""
            if vars=='all':
                myfeatures = inputdf.columns
            else:
                myfeatures=vars
    
            assembler = VectorAssembler(inputCols = myfeatures,outputCol="features")
            assembled = assembler.transform(inputdf)
    
            as_dense = udf(
                lambda v: DenseVector(v.toArray()) if v is not None else None,
                VectorUDT()
            )
    
            df_dense = assembled.withColumn("features1", as_dense(assembled.features))
            df_dense2 = df_dense.drop("features")
            df_dense3 = df_dense2.withColumnRenamed("features1","features")
            return df_dense3
        def outputcompfile(self,filewlocation):
            """ Outputs the loading of principal components to the file specified"""
            df = self.components
            df.to_csv(filewlocation,index=False)
            print ("Component matrix is now available at the location : "+filewlocation)
        def identity(self):
            """ Outputs an identity matrix in the form of features column in a dataframe"""
            iden = np.identity(len(self.vars)).tolist()
            rddi = sc.parallelize(iden)
            df_identity = rddi.map(lambda line:Row(std_features=Vectors.dense(line))).toDF()
            return df_identity
        def fit(self,inputdf,myfeatures):
            """ Fits the input dataframe in a PCA model with the given features """
            start_time = time.time()      # Start Timer
            if myfeatures=='all':
                self.vars = inputdf.columns
            else:
                self.vars = myfeatures
            # vectorize and scale
            df_dense = self.vectorizedf(inputdf,self.vars)
            df_normalized = self.scalemeanstd(df_dense)
            # Compute covariance matrix, eigenvalues and eigenvectors
            dfzeromean = df_normalized.select(self.inputcol)
            self.covariance = dfzeromean.map(lambda x:np.outer(x,x)).sum()/dfzeromean.count()
            col1 = self.covariance.shape[1]
            eigvals,eigvecs = eigh(self.covariance)
            inds = np.argsort(eigvals)
            self.eigvals = eigvals[inds[-1:-(col1+1):-1]]
            self.eigvecs = -1*eigvecs.T[inds[-1:-(col1+1):-1]]
            self.varianceexplained = np.sum(self.eigvals[0:self.n])/np.sum(self.eigvals)
            # Fit PCA model
            model1 = self.model.fit(df_normalized)
            df_features = model1.transform(df_normalized)
            # Compute components and put in a pandas dataframe
            df_identity = self.identity()
            components = model1.transform(df_identity)
            components = components.withColumnRenamed('pca_features','components')
            edf_rdd = components.select("components").rdd.map(lambda x: tuple(x.components.toArray().tolist()))
            edf_pandas = edf_rdd.toDF(self.cols).toPandas()
            comp_ind = sqlContext.createDataFrame([Row(industries=self.vars)])
            comp_ind_pandas = comp_ind.select(explode(comp_ind.industries).alias("Variable")).toPandas()
            self.components = pd.concat([comp_ind_pandas,edf_pandas],axis=1)
            # Compute and standardize projections if self.std = True
            if self.std:
                projections1=self.scalemeanstd(df_features,inputcol = "pca_features",outputcol = "projections")
            else:
                projections1 = df_features.withColumnRenamed('pca_features','projections')
            # Prepare data for output
            self.projections = projections1.select('projections')
            drop_list = ['features','std_features','pca_features']
            projections2 = projections1.select([c for c in projections1.columns if c not in drop_list])
            l = ['x.'+c for c in inputdf.columns]
            cst = '['+",".join(l)+']'
            final_df = projections1.rdd.map(lambda x: tuple(eval(cst))+tuple(x.projections.toArray().tolist())).toDF(inputdf.columns+self.cols)
            self.result = final_df
            print("PCA fitting took a total of %s seconds " % (time.time() - start_time))
    

    The way to use this class would be as follows :

    fs = ep.princomp(n=15)
    fs.fit(df,select_features) # here select_features is the list of columns that you wish to perform PCA on
    fs.outputcompfile('/some/output/location/components.csv')
    print (fs.eigvals)
    print (fs.varianceexplained)
    

    So you can output the components file at a location which would have all the factor loadings. Using that file you can find out all the variables that contribute the most to the first n principal components.