Search code examples
pythonpysparkspark-koalas

How to pivot string column using pandas api on spark


I am attempting to convert some code my organization uses from pandas dataframes to pandas api on spark dataframes. We have run into a problem when we try to convert our pivot functions where pandas api on spark does not allow pivot operations on string columns. I have recreated a small example to demonstrate the problem. The following pandas operation completes successfully.

import pandas as pd

pd_df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two',
                           'two'],
                   'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                   'baz': [1, 2, 3, 4, 5, 6],
                   'zoo': [10, 11, 12, 13, 14, 15]})                   

pd_df = pd_df.pivot(columns='bar').sort_index() 

pd_df.head()

The output for this is the following

index foo - - baz - - zoo - -
bar A B C A B C A B C
0 one NaN NaN 1.0 NaN NaN 10.0 NaN NaN
1 NaN one NaN NaN 2.0 NaN NaN 11.0 NaN
2 NaN NaN one NaN NaN 3.0 NaN NaN 12.0
3 two NaN NaN 4.0 NaN NaN 13.0 NaN NaN
4 NaN two NaN NaN 5.0 NaN NaN 14.0 NaN
5 NaN NaN two NaN NaN 6.0 NaN NaN 15.0

Note the bar,A,B,C row represent our multi-indexed column names and is part of the header, not the data.

While the pandas runs without a problem, our pandas api on spark pivot fails with the error "values should be a numeric type". This is due to our 'foo' column containing strings.

import pyspark.pandas as ps

ps_df = ps.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two',
                           'two'],
                   'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                   'baz': [1, 2, 3, 4, 5, 6],
                   'zoo': [10, 11, 12, 13, 14, 15]})                   

ps_df = ps_df.pivot(columns='bar', values=['foo', 'baz', 'zoo']).sort_index() 

ps_df.head()

I am trying to figure out how to reproduce the output from the pandas pivot call using pandas api on spark. Is there any way to do this? I have seen this question (Pivot String column on Pyspark Dataframe) but it uses pyspark rather than pandas api on spark.

I want to stick to pandas api on spark as much as possible as much of our team isn't familiar with spark so I was hoping to find a solution that would not require that. Does anyone know a way to perform this operation in pandas api on spark?


Solution

  • I was able to find a way around this by converting creating a dictionary for each column containing the unique values for that column, and assigning each unique value using an integer. From there we can use the regular pivot function then convert back. This only has been tested with one pivot column which was sufficient for my purposes. It was also only tested with a column named as index so that may be necessary too.

    Note that the runtime is quite slow, mainly due to the dictionary creation step. It took about 20 minutes to create 4 dictionaries from our non-numeric columns, with a total of about 4000 elements across all dictionaries. The rest of the steps run in under a minute so other than this it does work and gets the desired answer.

    Also note that we rename the columns so they do not have underscores, then rename them to the original names at the end. This is because the pivot operation does not like columns with underscore names so the rename avoids this.

    import pyspark.pandas as ps
    import pandas as pd
    from pandas.api.types import is_numeric_dtype
    
    def pivot_pandas_on_spark_df_with_string(ps_df, pivot_col, values_cols, index=None):  
      original_index = index      
      temp_column_names = list(range(1,len(ps_df.columns)+1))
      original_col_names = ps_df.columns  
      rename_dict = {original_col: str(temp_col) for original_col, temp_col in zip(original_col_names, temp_column_names)}
      
      ps_df.rename(columns = rename_dict, inplace = True)
      original_name_dict = {v: k for k, v in rename_dict.items()}  
      
      non_numeric_cols = []
      mapping_dicts = {}
      # Pivot does not like columns with underscores in the name, so we rename all columns to be safe
      renamed_index_col = str(rename_dict[index])
      renamed_pivot_col = str(rename_dict[pivot_col])
      renamed_value_cols = []
      
      for col in ps_df.columns:        
    
              if not is_numeric_dtype(ps_df[col]) and col is not renamed_pivot_col and col is not renamed_index_col:
                      non_numeric_cols.append(col)                                
                      codes, uniques = ps_df[col].factorize()
                      original_vals = (uniques.take((codes).to_numpy()))
                      mapping_dicts[col] = {codes[i]: original_vals[i] for i in range(len(original_vals))}
                      ps_df[col] = codes                        
      
      for value_col in values_cols:
        renamed_value_cols.append(str(rename_dict[value_col]))    
      
      pivoted_df = ps_df.pivot(index=renamed_index_col, columns=renamed_pivot_col, values=renamed_value_cols).sort_index() 
      ps_df.rename(columns = original_name_dict, inplace=True) #Have to rename our original df back to the regular names for later in the parent function (ps_df is passed by reference)
        
      for col in non_numeric_cols:      
        sub_df = pivoted_df[col]
        mapping_dict = mapping_dicts[col]    
        for sub_col in sub_df.columns:
          pivoted_df[(col, sub_col)] = pivoted_df[(col, sub_col)].map(mapping_dict)
    
      pivoted_df.index.set_names(names = original_index, inplace=True)
      pivoted_df.rename(columns = original_name_dict, inplace=True)  
      pivoted_df.rename_axis(columns={rename_dict[pivot_col]: pivot_col},axis=1, inplace=True)
    
      return pivoted_df