Search code examples
pythonpandasdataframepysparkpyspark-pandas

Reshape the dataframe using PySpark/Pandas according to custom logic


I have a dataframe with structure similar as shown below:

INP_A INP_B OUTP_A OUTP_B LVL_NUM BTCH_NUM
0 m1 b1 0 1
1 m12 b12 m1 b1 1 1
2 m13 b13 m1 b1 1 1
3 m21 b21 m12 b12 2 1
4 x1 b1 0 2
5 x12 b12 x1 b1 1 2
6 x13 b13 x12 b12 2 2
7 x21 b21 x13 b13 3 2

The desired outcome is basically, the reshaped (long-to-wide) dataframes that I need to write into parquet files for each unique BTCH_NUM value saved with the name format including the BTCH_NUM value.

The desired structure of DataFrames for each BTCH_NUM (which need to be written in separate files) is as follows:

REQUIRED OUTPUT

For BTCH_NUM = 1:

INP_A.1 INP_B.1 OUTP_A.1 OUTP_B.1 INP_A.2 INP_B.2 OUTP_A.2 OUTP_B.2
0 m12 b12 m1 b1 m21 b21 m12 b12
1 m13 b13 m1 b1

For BTCH_NUM = 2:

INP_A.1 INP_B.1 OUTP_A.1 OUTP_B.1 INP_A.2 INP_B.2 OUTP_A.2 OUTP_B.2 INP_A.3 INP_B.3 OUTP_A.3 OUTP_B.3
0 x12 b12 x1 b1 x13 b13 x12 b12 x21 b21 x13 b13

Now, the logic behind this reshaping is that, for each unique LVL_NUM value I am converting the old columns into new columns with the LVL_NUM as suffix. (for eg: INP_A becomes INP_A.1 for LVL_NUM = 1 and so on).

Some info about the data: Here the data is populated such that for each LVL_NUM (in each unique BTCH_NUM) the OUTP_A and OUTP_B values of higher LVL_NUM are INP_A and INP_B values of lower LVL_NUM respectively. For eg: (m1, b1 are OUTP_A, OUTP_B values in LVL_NUM = 1 and INP_A, INP_B values in LVL_NUM = 0)

What I have tried so far? (after reading tons of pandas/pyspark documentations and stackoverflow answers)

# I am using pandas but I'll prefer a pyspark implementation otherwise I can do with pandas for now
import pandas as pd 

# Dummy data (Actually, I am using parquet files containing dataframes)
data = {
    'INP_A': ['m1','m12', 'm13','m21','x1','x12', 'x13','x21'],
    'INP_B': ['b1', 'b12', 'b13', 'b21','b1', 'b12', 'b13', 'b21'],
    'OUTP_A': ['', 'm1', 'm1', 'm12','', 'x1', 'x12', 'x13'],
    'OUTP_B': ['', 'b1', 'b1', 'b12','', 'b1', 'b12', 'b13'],
    'LVL_NUM': [0,1,1,2,0,1,2,3],
    'BTCH_NUM': [1,1,1,1,2,2,2,2]
}

# Convert to a pandas dataframe
df = pd.DataFrame(data)

#  Group the df into multiple dataframess w.r.t. the BTCH_NUM
grouped_dfs = [i[1] for i in df.groupby("BTCH_NUM")]

# Iterate over each df, transform into long-to-wide format and print the result
# To do: Instead of printing, save the df in a new file
for i in grouped_dfs: 
    i = i.drop(columns='BTCH_NUM')
    i = i[i['LVL_NUM']>0]
    df_out = i.set_index('LVL_NUM')
    df_out = df_out.stack()
    df_out.index = df_out.index.map('{0[1]}.{0[0]}'.format)
    df_out = df_out.to_frame().T
    s = df_out.columns.to_series()
    df_out.columns = [df_out.columns, s.groupby(s).cumcount()]
    df_out = df_out.stack().sort_index(level=1).fillna('').reset_index(level=1, drop=True).reset_index().drop(columns='index')
    print(df_out)
    print(' ')

If you'll run the above code in jupyter, you'll get the following output:

ACTUAL OUTPUT

INP_A.1 INP_A.2 INP_B.1 INP_B.2 OUTP_A.1 OUTP_A.2 OUTP_B.1 OUTP_B.2
0 m12 m21 b12 b21 m1 m12 b1 b12
1 m13 b13 m1 b1
INP_A.1 INP_A.2 INP_A.3 INP_B.1 INP_B.2 INP_B.3 OUTP_A.1 OUTP_A.2 OUTP_A.3 OUTP_B.1 OUTP_B.2 OUTP_B.3
0 x12 x13 x21 b12 b13 b21 x1 x12 x13 b1 b12 b13

If you'll compare the actual output with the required output, you'll notice, the only difference is that the columns are in different order. So, that is the first thing I need help with (columns are currently coming in sorted order but I need the "required output" format only) and yes, if there are any better approaches to do the whole reshaping process I am open for that too. Thank you so much for your time if you read this far.

PS: I am very new to pyspark/pandas/python/Asking-Questions-on-StackOverflow. So, this question may not be perfectly written. Please ask if you have any confusion regarding the reshaping part or any part in general.


Solution

  • You can use a pivot after reworking the DataFrame:

    df2 = (df
     .assign(idx=df.groupby(['BTCH_NUM', 'LVL_NUM']).cumcount(),
             LVL_NUM=lambda d: d.groupby(['BTCH_NUM', 'idx']).cumcount().add(1)
            )
     .pivot(index=['BTCH_NUM', 'idx'], columns='LVL_NUM')
     .sort_index(level=1, axis=1, sort_remaining=False)
     .pipe(lambda d: d.set_axis(d.columns.map(lambda x: f'{x[0]}_{x[1]}'), axis=1))
    )
    

    output:

                 INP_A_1 INP_B_1 OUTP_A_1 OUTP_B_1 INP_A_2 INP_B_2 OUTP_A_2 OUTP_B_2 INP_A_3 INP_B_3 OUTP_A_3 OUTP_B_3 INP_A_4 INP_B_4 OUTP_A_4 OUTP_B_4
    BTCH_NUM idx                                                                                                                                        
    1        0        m1      b1                       m12     b12       m1       b1     m21     b21      m12      b12     NaN     NaN      NaN      NaN
             1       m13     b13       m1       b1     NaN     NaN      NaN      NaN     NaN     NaN      NaN      NaN     NaN     NaN      NaN      NaN
    2        0        x1      b1                       x12     b12       x1       b1     x13     b13      x12      b12     x21     b21      x13      b13
    

    Then you get the individual sub-DataFrames by slicing, for example for BTCH_NUM 1:

    df2.loc[1]