I am conducting Principal Component Analysis in Spark Scala.
My output only display the principal component score vectors. But (1) how can I get displayed the principal component loadings and (2) select the variables that contribute to the most extent to the 1st and 2nd components(i.e., have particularly high loadings on the components). I suggest to look at the first two components as they usually capture the most variance.
Any help & advice appreciated!
What I implemented so far:
// import org.apache.spark.ml.feature.{PCA, VectorAssembler, StandardScaler}
// import org.apache.spark.ml.linalg.Vectors
val dataset = (spark.createDataFrame(
Seq((1.2, 1.3, 1.7, 1.9), (2.2, 2.3, 2.7, 2.9), (3.2, 3.3, 3.5, 3.7))
).toDF("f1", "f2", "f3", "f4"))
val assembler = new VectorAssembler()
.setInputCols(Array("f1", "f2", "f3", "f4"))
.setOutputCol("features")
val output = assembler.transform(dataset)
// Normalizing each feature to have a zero mean
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(false)
.setWithMean(true)
val scalerModel = scaler.fit(output)
val scaledData = scalerModel.transform(output)
// Compute PCA on a vector of features
// Display the principal component score vectors
val pca = new PCA()
.setInputCol("scaledFeatures")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(scaledData)
// ? How to extract the variables contributing most to principal components?
I have implemented a class in python which computes primarily everything related to principal components in pyspark. I recognize that your code is in scala but you can have a look if it is of any help. The implementation of the class is as follows:
from pyspark import SparkConf,SparkContext
from pyspark.sql.context import HiveContext
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler,PCA,StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.param import Param,Params
from pyspark.sql.types import *
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import _convert_to_vector, Vectors, Matrix, DenseMatrix
from pyspark.sql.functions import array, col, explode, struct, lit, udf, sum, when,rowNumber,avg,pow,sqrt,mean,log,desc
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.sql.window import Window
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
import time, os, sys, json, math
import datetime as dt
import subprocess
import getpass
import pdb
import csv
import pandas as pd
import numpy as np
from copy import copy
from numpy.linalg import eigh
import itertools
class princomp:
def __init__(self,n=5,std=True,prefix='pcomp'):
self.n=n
self.std=std
self.prefix=prefix
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
self.inputcol = "std_features"
self.outputcol = "pca_features"
self.vars = 'all'
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
self.components = sqlContext.createDataFrame(pd.DataFrame([0]))
self.projections = sqlContext.createDataFrame(pd.DataFrame([0]))
self.result = sqlContext.createDataFrame(pd.DataFrame([0]))
def helpme(self):
print ('|-- Please note that output_parameters will have null values before calling the fit method')
print ('|-- n : input_parameter : sets the number of principal components, default = 5')
print ('|-- std : input_parameter : True/False value specifying whether to standardize the principal components, default = True')
print ('|-- prefix : input_parameter : string specifying the prefix for columns of principal components , default = pcomp')
print ('|-- vars : input_parameter : list of variable names to be used for principal components, default = all')
print ('|-- components : output_parameter : pandas dataframe of coefficients of different input columns for computing principal components')
print ('|-- result : output_parameter : spark dataframe of original variables joined with projections of principal components')
print ('|-- covariance : output_parameter : Numpy array of the covariance matrix')
print ('|-- eigvals : output_parameter : Numpy array of all eigenvalues')
print ('|-- eigvecs : output_parameter : Numpy array of all eigenvectors')
print ('|-- varianceexplained : output_parameter : variance explained by the n principal components')
print ('|-- outputcompfile(file) : method : outputs the components matrix to the specified file')
print ('|-- fit(inputdf,myfeatures) : method : fit method which computes all output parameters')
# SET methods
def setn(self,val):
self.n = val
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setstd(self,val):
self.std = val
def setprefix(self,val):
self.prefix=val
self.cols = [self.prefix+str(i) for i in list(range(1,self.n+1))]
def setcols(self,val):
self.cols = val
def setinputcol(self,val):
self.inputcol = val
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setoutputcol(self,val):
self.outputcol = val
self.model = PCA(k=self.n,inputCol=self.inputcol,outputCol=self.outputcol)
def setvars(self,val):
self.vars = val
def setmodel(self,val):
self.model = val
def setcomponents(self,val):
self.components=val
def setprojections(self,val):
self.projections=val
def setresult(self,val):
self.result = val
# GET methods
def getn(self):
return self.n
def getstd(self):
return self.std
def getprefix(self):
return self.prefix
def getcols(self):
return self.cols
def getinputcol(self):
return self.inputcol
def getoutputcol(self):
return self.outputcol
def getvars(self):
return self.vars
def getmodel(self):
return self.model
def getcomponents(self):
return self.components
def getprojections(self):
return self.projections
def getresult(self):
return self.result
# CORE methods
def vectorizedf(inputdf,vars='all'):
"""Returns the input spark dataframe with an additional column of dense vector features"""
if vars=='all':
myfeatures = inputdf.columns
else:
myfeatures=vars
assembler = VectorAssembler(inputCols = myfeatures,outputCol="features")
assembled = assembler.transform(inputdf)
as_dense = udf(
lambda v: DenseVector(v.toArray()) if v is not None else None,
VectorUDT()
)
df_dense = assembled.withColumn("features1", as_dense(assembled.features))
df_dense2 = df_dense.drop("features")
df_dense3 = df_dense2.withColumnRenamed("features1","features")
return df_dense3
def outputcompfile(self,filewlocation):
""" Outputs the loading of principal components to the file specified"""
df = self.components
df.to_csv(filewlocation,index=False)
print ("Component matrix is now available at the location : "+filewlocation)
def identity(self):
""" Outputs an identity matrix in the form of features column in a dataframe"""
iden = np.identity(len(self.vars)).tolist()
rddi = sc.parallelize(iden)
df_identity = rddi.map(lambda line:Row(std_features=Vectors.dense(line))).toDF()
return df_identity
def fit(self,inputdf,myfeatures):
""" Fits the input dataframe in a PCA model with the given features """
start_time = time.time() # Start Timer
if myfeatures=='all':
self.vars = inputdf.columns
else:
self.vars = myfeatures
# vectorize and scale
df_dense = self.vectorizedf(inputdf,self.vars)
df_normalized = self.scalemeanstd(df_dense)
# Compute covariance matrix, eigenvalues and eigenvectors
dfzeromean = df_normalized.select(self.inputcol)
self.covariance = dfzeromean.map(lambda x:np.outer(x,x)).sum()/dfzeromean.count()
col1 = self.covariance.shape[1]
eigvals,eigvecs = eigh(self.covariance)
inds = np.argsort(eigvals)
self.eigvals = eigvals[inds[-1:-(col1+1):-1]]
self.eigvecs = -1*eigvecs.T[inds[-1:-(col1+1):-1]]
self.varianceexplained = np.sum(self.eigvals[0:self.n])/np.sum(self.eigvals)
# Fit PCA model
model1 = self.model.fit(df_normalized)
df_features = model1.transform(df_normalized)
# Compute components and put in a pandas dataframe
df_identity = self.identity()
components = model1.transform(df_identity)
components = components.withColumnRenamed('pca_features','components')
edf_rdd = components.select("components").rdd.map(lambda x: tuple(x.components.toArray().tolist()))
edf_pandas = edf_rdd.toDF(self.cols).toPandas()
comp_ind = sqlContext.createDataFrame([Row(industries=self.vars)])
comp_ind_pandas = comp_ind.select(explode(comp_ind.industries).alias("Variable")).toPandas()
self.components = pd.concat([comp_ind_pandas,edf_pandas],axis=1)
# Compute and standardize projections if self.std = True
if self.std:
projections1=self.scalemeanstd(df_features,inputcol = "pca_features",outputcol = "projections")
else:
projections1 = df_features.withColumnRenamed('pca_features','projections')
# Prepare data for output
self.projections = projections1.select('projections')
drop_list = ['features','std_features','pca_features']
projections2 = projections1.select([c for c in projections1.columns if c not in drop_list])
l = ['x.'+c for c in inputdf.columns]
cst = '['+",".join(l)+']'
final_df = projections1.rdd.map(lambda x: tuple(eval(cst))+tuple(x.projections.toArray().tolist())).toDF(inputdf.columns+self.cols)
self.result = final_df
print("PCA fitting took a total of %s seconds " % (time.time() - start_time))
The way to use this class would be as follows :
fs = ep.princomp(n=15)
fs.fit(df,select_features) # here select_features is the list of columns that you wish to perform PCA on
fs.outputcompfile('/some/output/location/components.csv')
print (fs.eigvals)
print (fs.varianceexplained)
So you can output the components file at a location which would have all the factor loadings. Using that file you can find out all the variables that contribute the most to the first n principal components.