I am attempting to convert some code my organization uses from pandas dataframes to pandas api on spark dataframes. We have run into a problem when we try to convert our pivot functions where pandas api on spark does not allow pivot operations on string columns. I have recreated a small example to demonstrate the problem. The following pandas operation completes successfully.
import pandas as pd
pd_df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two',
'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': [10, 11, 12, 13, 14, 15]})
pd_df = pd_df.pivot(columns='bar').sort_index()
pd_df.head()
The output for this is the following
index | foo | - | - | baz | - | - | zoo | - | - |
---|---|---|---|---|---|---|---|---|---|
bar | A | B | C | A | B | C | A | B | C |
0 | one | NaN | NaN | 1.0 | NaN | NaN | 10.0 | NaN | NaN |
1 | NaN | one | NaN | NaN | 2.0 | NaN | NaN | 11.0 | NaN |
2 | NaN | NaN | one | NaN | NaN | 3.0 | NaN | NaN | 12.0 |
3 | two | NaN | NaN | 4.0 | NaN | NaN | 13.0 | NaN | NaN |
4 | NaN | two | NaN | NaN | 5.0 | NaN | NaN | 14.0 | NaN |
5 | NaN | NaN | two | NaN | NaN | 6.0 | NaN | NaN | 15.0 |
Note the bar,A,B,C row represent our multi-indexed column names and is part of the header, not the data.
While the pandas runs without a problem, our pandas api on spark pivot fails with the error "values should be a numeric type". This is due to our 'foo' column containing strings.
import pyspark.pandas as ps
ps_df = ps.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two',
'two'],
'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
'baz': [1, 2, 3, 4, 5, 6],
'zoo': [10, 11, 12, 13, 14, 15]})
ps_df = ps_df.pivot(columns='bar', values=['foo', 'baz', 'zoo']).sort_index()
ps_df.head()
I am trying to figure out how to reproduce the output from the pandas pivot call using pandas api on spark. Is there any way to do this? I have seen this question (Pivot String column on Pyspark Dataframe) but it uses pyspark rather than pandas api on spark.
I want to stick to pandas api on spark as much as possible as much of our team isn't familiar with spark so I was hoping to find a solution that would not require that. Does anyone know a way to perform this operation in pandas api on spark?
I was able to find a way around this by converting creating a dictionary for each column containing the unique values for that column, and assigning each unique value using an integer. From there we can use the regular pivot function then convert back. This only has been tested with one pivot column which was sufficient for my purposes. It was also only tested with a column named as index so that may be necessary too.
Note that the runtime is quite slow, mainly due to the dictionary creation step. It took about 20 minutes to create 4 dictionaries from our non-numeric columns, with a total of about 4000 elements across all dictionaries. The rest of the steps run in under a minute so other than this it does work and gets the desired answer.
Also note that we rename the columns so they do not have underscores, then rename them to the original names at the end. This is because the pivot operation does not like columns with underscore names so the rename avoids this.
import pyspark.pandas as ps
import pandas as pd
from pandas.api.types import is_numeric_dtype
def pivot_pandas_on_spark_df_with_string(ps_df, pivot_col, values_cols, index=None):
original_index = index
temp_column_names = list(range(1,len(ps_df.columns)+1))
original_col_names = ps_df.columns
rename_dict = {original_col: str(temp_col) for original_col, temp_col in zip(original_col_names, temp_column_names)}
ps_df.rename(columns = rename_dict, inplace = True)
original_name_dict = {v: k for k, v in rename_dict.items()}
non_numeric_cols = []
mapping_dicts = {}
# Pivot does not like columns with underscores in the name, so we rename all columns to be safe
renamed_index_col = str(rename_dict[index])
renamed_pivot_col = str(rename_dict[pivot_col])
renamed_value_cols = []
for col in ps_df.columns:
if not is_numeric_dtype(ps_df[col]) and col is not renamed_pivot_col and col is not renamed_index_col:
non_numeric_cols.append(col)
codes, uniques = ps_df[col].factorize()
original_vals = (uniques.take((codes).to_numpy()))
mapping_dicts[col] = {codes[i]: original_vals[i] for i in range(len(original_vals))}
ps_df[col] = codes
for value_col in values_cols:
renamed_value_cols.append(str(rename_dict[value_col]))
pivoted_df = ps_df.pivot(index=renamed_index_col, columns=renamed_pivot_col, values=renamed_value_cols).sort_index()
ps_df.rename(columns = original_name_dict, inplace=True) #Have to rename our original df back to the regular names for later in the parent function (ps_df is passed by reference)
for col in non_numeric_cols:
sub_df = pivoted_df[col]
mapping_dict = mapping_dicts[col]
for sub_col in sub_df.columns:
pivoted_df[(col, sub_col)] = pivoted_df[(col, sub_col)].map(mapping_dict)
pivoted_df.index.set_names(names = original_index, inplace=True)
pivoted_df.rename(columns = original_name_dict, inplace=True)
pivoted_df.rename_axis(columns={rename_dict[pivot_col]: pivot_col},axis=1, inplace=True)
return pivoted_df