I am new to Pyspark and I've been pulling my hair out trying to accomplish something I believe is fairly simple. I am trying to do an ETL process where a csv file is converted to a parquet file. The CSV file has a few simple columns, but one column is a delimited array of integers that I want to expand/unzip into a parquet file. This parquet file is actually used by a .net core micro service which uses a Parquet Reader to do calculations downstream. To keep this question simple, the structure of the column is:
"geomap" 5:3:7|4:2:1|8:2:78 -> this represents an array of 3 items, it is split at the "|" and then a tuple is build of the values (5,3,7), (4,2,1), (8,2,78)
I have tried various processes and schemas and I can't get this correct. Via UDF I am creating either a list of lists or a list of tuple, but I can't get the schema correct or unzip explode the data into the parquet write operation. I either get nulls, an error or other problems. Do I need to approach this differently? Relevant code is below. I am just showing the problem column for simplicity since I have the rest working. This is my first Pyspark attempt, so apologies for missing something obvious:
def convert_geo(geo):
return [tuple(x.split(':')) for x in geo.split('|')]
compression_type = 'snappy'
schema = ArrayType(StructType([
StructField("c1", IntegerType(), False),
StructField("c2", IntegerType(), False),
StructField("c3", IntegerType(), False)
]))
spark_convert_geo = udf(lambda z: convert_geo(z),schema)
source_path = '...path to csv'
destination_path = 'path for generated parquet file'
df = spark.read.option('delimiter',',').option('header','true').csv(source_path).withColumn("geomap",spark_convert_geo(col('geomap')).alias("geomap"))
df.write.mode("overwrite").format('parquet').option('compression', compression_type).save(destination_path)
EDIT: Per request adding the printSchema() output, I'm not sure what's wrong in here either. I still can't seem to get the string split values to show up or render properly. This contains all the columns. I do see the c1 and c2 and c3 struct names...
root |-- lrsegid: integer (nullable = true) |-- loadsourceid: integer (nullable = true) |-- agencyid: integer (nullable = true) |-- acres: float (nullable = true) |-- sourcemap: array (nullable = true) | |-- element: integer (containsNull = true) |-- geomap: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- c1: integer (nullable = false) | | |-- c2: integer (nullable = false) | | |-- c3: integer (nullable = false)
The problem is that the convert_geo
function returns a list of tuples with character elements rather than ints as specified in the schema. If you modify as follows it will work:
def convert_geo(geo):
return [tuple([int(y) for y in x.split(':')]) for x in geo.split('|')]