Search code examples
pythondataframeapache-sparkpyspark

Python Script to Pyspark Script


I am unable to convert this data into pyspark script.

    data = [(1, 'N'),
            (2, 'N'),
            (3, 'N'),
            (4, 'Y'),
            (5, 'Y'),
            (6, 'N'),
            (7, 'N'),
            (8, 'Y'),
            (9, 'Y'),
            (10, 'N')]
    modified_data = []
    new_col = 0  # Initialize new_col
    for id_, flag in data:
        if flag == 'N':
            new_col = id_ - 1
        modified_data.append((id_, flag, new_col))
    print(modified_data)

The result should be:

[(1, 'N', 0), (2, 'N', 1), (3, 'N', 2), (4, 'Y', 2), (5, 'Y', 2), (6, 'N', 5), (7, 'N', 6), (8, 'Y', 6), (9, 'Y', 6), (10, 'N', 9)]

Here data is dataframe. Need to add new_col into it with the result value.


Solution

  • Check this out

    import pyspark.sql.functions as f
    
    data = [
        (1, 'N'),
        (2, 'N'),
        (3, 'N'),
        (4, 'Y'),
        (5, 'Y'),
        (6, 'N'),
        (7, 'N'),
        (8, 'Y'),
        (9, 'Y'),
        (10, 'N')
    ]
    df = spark.createDataFrame(data, ['id', 'flag'])
    df = (
        df
        .withColumn('new_col', f.when(f.col('flag') == 'N', f.col('id') - 1))
        .withColumn('new_col', f.when(f.col('new_col').isNull(), f.last(f.col('new_col'), True).over(Window.orderBy('id'))).otherwise(f.col('new_col')))
    )
    
    df.show()
    

    And the output is:

    +---+----+-------+                                                              
    | id|flag|new_col|
    +---+----+-------+
    |  1|   N|      0|
    |  2|   N|      1|
    |  3|   N|      2|
    |  4|   Y|      2|
    |  5|   Y|      2|
    |  6|   N|      5|
    |  7|   N|      6|
    |  8|   Y|      6|
    |  9|   Y|      6|
    | 10|   N|      9|
    +---+----+-------+