Search code examples
pythoncsvapache-sparkpysparktext-parsing

PySpark 2.4 - Read CSV file with custom line separator


Support for custom line separators (for various text file formats) was added to spark in 2017 (see: https://github.com/apache/spark/pull/18581).

... or maybe it wasn't added in 2017 - or ever (see: https://github.com/apache/spark/pull/18304)

Today, with Pyspark 2.4.0 I am unable to use custom line separators to parse CSV files.

Here's some code:

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType
)

list_structs = StructType([StructField('id', StringType(), True),StructField('desc', StringType(), True)])

df = spark.read.load("mnt/one.csv",
                     format="csv", 
                     sep="\x1e",
                     schema=list_structs)
print("one.csv rowcount: {}".format(df.count()))

df2 = spark.read.load("mnt/two.csv",
                     format="csv", 
                     sep="\x1e",
                     lineSep="\x1d",
                     schema=list_structs)
print("two.csv rowcount: {}".format(df2.count()))

Here's two sample csv files: one.csv - lines are separated by line feed character '0A'

"1","foo"
"2","bar"
"3","foobar"

two.csv - lines are separated by group separator character '1D'

"1","foo""2","bar""3","foobar"

I want the output from the code to be:

one.csv rowcount: 3
two.csv rowcount: 3

The output I receive is:

one.csv rowcount: 3
two.csv rowcount: 1

And ideas on how I can get Pyspark to accept the Group separator char as a line separator?


Solution

  • I can get the result I want with this:

    import pandas as pd
    
    padf = pd.read_csv("/dbfs/mnt/two.csv",
                      engine="c",
                      sep="\x1e",
                      lineterminator ="\x1d",
                      header=None,
                      names=['id','desc'])
    df = sqlContext.createDataFrame(padf)
    print("two.csv rowcount: {}".format(df.count()))
    

    It depends on Pandas and the data might be read twice here (I'm not sure what happens internally when a RDD is created from a panda dataFrame).