Search code examples
apache-spark-sqlspark-koalas

how to include a parameter into a mask or where function in a Koalas dataframe


I have a Koalas dataframe running in Azure databricks, lets say:

import databricks.koalas   as pd

df = pd.DataFrame({'category': ['A', 'A', 'B'],
                   'col1': [1, 2, 3],
                   'col2': [4, 5, 6]},
                  columns=['category', 'col1', 'col2'])

I want to create a new column (currently in a function) that evaluates a column depending on its values (which are finite and known) and fills the new column with elements from another dataset's columns. (Why?, because this second dataset contains parameters to include and has been previously filtered before being used for this data extraction)

I have tried (code from within a function that calls df and params)

    return      df.assign(col3= df.category.map({
      'A': params['column1'] , 
      'B' : params['column2']  , 
      'C': params['column3']     
    #}) )

with error message:

PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

which is weird as I have no call to Pandas anywhere

Also I have tried:

    df = df.assign(col3 = None)

    return     df.col3.mask(df.category== 'A', params['column1']) \
                .mask(df.category == 'B' , params['column2'])\
                .mask(df.category == 'C', params['column3'])

In the second case, the returned dataframe has all nan values.

I have also already set 'compute.ops_on_diff_frames' to True


Solution

  • I found a work-around to this issue, but I am still not clear on the why.

    In my case I am making two steps. The first is to reset index of the filtered parameter dataframe.

    params = orig_params.loc[*whatever filtering may fit here*].reset_index()
    

    Secondly, just use the filtered element that we want. If not using this step, the masking will require you to input a series as long as required. Here, we just convert it into a scalar, like:

        temp = df.category
        temp = temp.mask(temp == 'A', params['column1'][0]) \
          .mask(temp == 'B' , params['column2'][0])\
          .mask(temp == 'C', params['column3'][0])
        df1 = df.assign(col3 = temp)
        return df1
    

    Hopefully this is useful. If anybody has a better solution and/or explanation please post it.