I am trying to transform data in Pyspark dataframe in order to export it. I have arrays like "[1,2,3]", and I need to transform it to a string like "(1;2;3)". Array need to be concatenated, and parenthesis should be added at beginning and end of the array. I also need to apply some regex.
Sample input would be like :
col1 | array1 | col2 |
---|---|---|
"First" | [1,2,3] | "a~" |
"Second" | [4,5,6] | "b" |
Excepted output :
col1 | array1 | col2 |
---|---|---|
"First" | "(1;2;3)" | "a" |
"Second" | "(4;5;6)" | "b" |
Actual wrong output :
col1 | array1 | col2 |
---|---|---|
"First" | "(4;5;6)" | "a" |
"Second" | "(X;X;X)" | "b" |
where "(X;X;X)" would be data from another row.
I tried the following code :
for c in df.columns:
if isinstance(df.schema[c].dataType, ArrayType):
print(c)
df= df.withColumn(c, concat_ws(';', col(c))).withColumn(c, concat(lit("("), col(c), lit(")"))).withColumn(c, F.regexp_replace(c, '\n|\r|\\n|\\r|~|\\(\\)|', ''))
else:
df= df.withColumn(c, F.regexp_replace(c, '\n|\r|\\n|\\r|~|', ''))
I make a loop on every column of the Pyspark Dataframe. If the column is an array, I concatenate it and apply the regexp. If not, I only apply the regexp.
The issue is, after those operation, data is shuffled in my columns, and I don't have the data excepted. For example, if column d had "b" as a value for a given row, it would now be "c" or "d" for the same row.
How can I apply those transformations without "shuffling" the data ? I am not sure that the way I actually loop on each column is a great practice with PySpark, but I really need to apply my function on every column, and check if it's an array or not to adapt the processing.
Based on your data, here is the dataframe:
a = [
("First", [1, 2, 3], "a~"),
("Second", [4, 5, 6], "b"),
]
b = "col1 array1 col2".split()
df = spark.createDataFrame(a,b)
df.show()
+------+---------+----+
| col1| array1|col2|
+------+---------+----+
| First|[1, 2, 3]| a~|
|Second|[4, 5, 6]| b|
+------+---------+----+
I tried you code. Nothing wrong :
from pyspark.sql import functions as F, types as T
for c in df.columns:
if isinstance(df.schema[c].dataType, T.ArrayType):
print(c)
df = (
df.withColumn(c, F.concat_ws(";", F.col(c)))
.withColumn(c, F.concat(F.lit("("), F.col(c), F.lit(")")))
.withColumn(c, F.regexp_replace(c, "\n|\r|\\n|\\r|~|\\(\\)|", ""))
)
else:
df = df.withColumn(c, F.regexp_replace(c, "\n|\r|\\n|\\r|~|", ""))
df.show()
+------+-------+----+
| col1| array1|col2|
+------+-------+----+
| First|(1;2;3)| a|
|Second|(4;5;6)| b|
+------+-------+----+