Search code examples
pythonpysparkconcatenation

Pyspark - Column transformation causes data shuffle


I am trying to transform data in Pyspark dataframe in order to export it. I have arrays like "[1,2,3]", and I need to transform it to a string like "(1;2;3)". Array need to be concatenated, and parenthesis should be added at beginning and end of the array. I also need to apply some regex.

Sample input would be like :

col1 array1 col2
"First" [1,2,3] "a~"
"Second" [4,5,6] "b"

Excepted output :

col1 array1 col2
"First" "(1;2;3)" "a"
"Second" "(4;5;6)" "b"

Actual wrong output :

col1 array1 col2
"First" "(4;5;6)" "a"
"Second" "(X;X;X)" "b"

where "(X;X;X)" would be data from another row.

I tried the following code :

for c in df.columns:
    if isinstance(df.schema[c].dataType, ArrayType):
        print(c)
        df= df.withColumn(c, concat_ws(';', col(c))).withColumn(c, concat(lit("("), col(c), lit(")"))).withColumn(c, F.regexp_replace(c, '\n|\r|\\n|\\r|~|\\(\\)|', ''))
    else:
        df= df.withColumn(c, F.regexp_replace(c, '\n|\r|\\n|\\r|~|', ''))

I make a loop on every column of the Pyspark Dataframe. If the column is an array, I concatenate it and apply the regexp. If not, I only apply the regexp.

The issue is, after those operation, data is shuffled in my columns, and I don't have the data excepted. For example, if column d had "b" as a value for a given row, it would now be "c" or "d" for the same row.

How can I apply those transformations without "shuffling" the data ? I am not sure that the way I actually loop on each column is a great practice with PySpark, but I really need to apply my function on every column, and check if it's an array or not to adapt the processing.


Solution

  • Based on your data, here is the dataframe:

    a = [
        ("First", [1, 2, 3], "a~"),
        ("Second", [4, 5, 6], "b"),
    ]
    
    b = "col1   array1  col2".split()
    
    df = spark.createDataFrame(a,b)
    
    df.show()
    
    +------+---------+----+
    |  col1|   array1|col2|
    +------+---------+----+
    | First|[1, 2, 3]|  a~|
    |Second|[4, 5, 6]|   b|
    +------+---------+----+
    

    I tried you code. Nothing wrong :

    from pyspark.sql import functions as F, types as T
    
    for c in df.columns:
        if isinstance(df.schema[c].dataType, T.ArrayType):
            print(c)
            df = (
                df.withColumn(c, F.concat_ws(";", F.col(c)))
                .withColumn(c, F.concat(F.lit("("), F.col(c), F.lit(")")))
                .withColumn(c, F.regexp_replace(c, "\n|\r|\\n|\\r|~|\\(\\)|", ""))
            )
        else:
            df = df.withColumn(c, F.regexp_replace(c, "\n|\r|\\n|\\r|~|", ""))
    
    df.show()
    
    +------+-------+----+
    |  col1| array1|col2|
    +------+-------+----+
    | First|(1;2;3)|   a|
    |Second|(4;5;6)|   b|
    +------+-------+----+