Search code examples
apache-sparkpysparkapache-spark-mllib

How to read a csv to use in pyspark MLlib?


I have a csv file that I'm trying to use as input of a KMeans algorithm in pyspark. I'm using the code from MLlib documentation.

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("P.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center) 

I'm getting the error:

java.lang.NumberFormatException: For input string: "-6.71,-1.14"

I tried to read the file as

dataset = spark.read.format("csv").load("P.txt")

But I get another error:

java.lang.IllegalArgumentException: Field "features" does not exist. Available fields: _c0, _c1

I'm beginner in pyspark, I tried to look for tutorials on that but I did't find any.


Solution

  • I found the problem. A DataFrame input of kmeans.fit needs to have a field "features", as the error java.lang.IllegalArgumentException: Field "features" does not exist. Available fields: _c0, _c1 was indicating.

    To do this we need a VectorAssembler, but before we need to convert the columns to a numeric type, otherwise we get the error java.lang.IllegalArgumentException: Data type string of column _c0 is not supported.

    from pyspark.sql.functions import col
    
    df = spark.read.csv('P.txt')
    # Convert columns to float
    df = df.select(*(col(c).cast("float").alias(c) for c in df.columns))
    
    assembler = VectorAssembler(
        inputCols=["_c0", "_c1"],
        outputCol="features")
    
    df = assembler.transform(df)
    df = df.drop("_c0")
    df = df.drop("_c1")
    df.show()