Search code examples
pythondataframeapache-sparkpysparkrdd

Creating DataFrame of different variable types


My raw data looks like this: String followed by numbers.

"cat",21,6,160,110,3.9,2.62,16.46,0,1,4,4
"dog",21,6,160,110,3.9,2.875,17.02,0,1,4,4
...

When I create my RDD and DF I want to keep the string and cast the rest to floats. So expected output for my DF will have 12 columns. The first column will be the string, the rest will be floats.

Below is my code below:

def parse_line(line):

    s = line.split(',')
    name = s[0]
    features = s[1:]
    features = [float(x) for x in features]

    return name, f

f = sc.textFile("animals.data")
rdd = f.map(parse_line)

df = sqlContext.createDataFrame(rdd)

Output only produced two columns:

+--------------------+--------------------+
|                  _1|                  _2|
+--------------------+--------------------+
|               "cat"| [21.0, 6.0, 160.0...|
|               "dog"| [21.0, 6.0, 160.0...|
|               "rat"| [22.8, 4.0, 108.0...|
|            "monkey"| [21.4, 6.0, 258.0...|
...

Solution

  • Option 1: The function parse_line returns a tuple with two elements: one element is the name and one element is the list of features. Therefore, the dataframe has only two columns. To fix that, parse_line should return a tuple with 12 elements in which all elements are floats or strings:

    def parse_line(line):
        [...]
        return (name,) + tuple(features)
    

    Option 2: You can use Spark to the read the data as CSV without using Pandas. It will be helpful to define the schema of the csv before reading it. This ensures that all numeric columns will be treated as floats:

    from pyspark.sql import types as T
    schema = T.StructType([
      T.StructField("col1", T.StringType(), True),
      T.StructField("col2", T.FloatType(), True),
      T.StructField("col3", T.FloatType(), True),
      T.StructField("col4", T.FloatType(), True),
      T.StructField("col5", T.FloatType(), True),
      T.StructField("col6", T.FloatType(), True),
      T.StructField("col7", T.FloatType(), True),
      T.StructField("col8", T.FloatType(), True),
      T.StructField("col9", T.FloatType(), True),
      T.StructField("col10", T.FloatType(), True)])
    
    df = spark.read.schema(schema).csv("test.csv")
    

    The result will be for both options a Spark dataframe with one string column and 11 float columns.