Search code examples
pythonpandasdataframedatabricksspark-koalas

PandasNotImplementedError for converted pandas dataframe to Koalas dataframe


I am having a small issue which I am facing in my code logic.

I am converting a line of code which uses pandas dataframe to use Koalas dataframe and I get the following error during the code execution.

# Error Message
PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

#Sample dataframe

# initialize data of lists.
data = {'team_code':['A1', 'S1'],
        'TeamName':['JohnTeam', 'SusanTeam']}
 
# Create DataFrame
input_df= pd.DataFrame(data)

The original line of code using the pandas dataframe is shown below:

# input_df is a pandas df here

to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])

dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
    

I converted the code above using a dataframe as shown below.

The original line of code using the pandas dataframe is shown below. The only difference is that input_df is now a koalas dataframe:

# input_df is now a koalas df here

to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])

dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
    

EDIT

Stacktrace on a databricks cluster with Spark 3.2.0 and koalas==1.8.2

---------------------------------------------------------------------------
PandasNotImplementedError                 Traceback (most recent call last)
<command-2399235872097642> in <module>
      2 to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])
      3 print(to_remove_df)
----> 4 dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
      5 dropped_df

/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    193             start = time.perf_counter()
    194             try:
--> 195                 res = func(*args, **kwargs)
    196                 logger.log_success(
    197                     class_name, function_name, time.perf_counter() - start, signature

/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    188         if hasattr(_local, "logging") and _local.logging:
    189             # no need to log since this should be internal call.
--> 190             return func(*args, **kwargs)
    191         _local.logging = True
    192         try:

/databricks/python/lib/python3.8/site-packages/databricks/koalas/base.py in isin(self, values)
   1234             )
   1235 
-> 1236         values = values.tolist() if isinstance(values, np.ndarray) else list(values)
   1237         return self._with_new_scol(self.spark.column.isin(values))
   1238 

/databricks/python/lib/python3.8/site-packages/databricks/koalas/indexes/base.py in __iter__(self)
   2479 
   2480     def __iter__(self):
-> 2481         return MissingPandasLikeIndex.__iter__(self)
   2482 
   2483     def __xor__(self, other):

/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    248     def wrapper(*args, **kwargs):
    249         try:
--> 250             return func(*args, **kwargs)
    251         finally:
    252             logger.log_missing(class_name, function_name, is_deprecated, signature)

/databricks/python/lib/python3.8/site-packages/databricks/koalas/missing/__init__.py in unsupported_function(*args, **kwargs)
     20 def unsupported_function(class_name, method_name, deprecated=False, reason=""):
     21     def unsupported_function(*args, **kwargs):
---> 22         raise PandasNotImplementedError(
     23             class_name=class_name, method_name=method_name, reason=reason
     24         )

PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

I am not able to figure out what I missed in the code above when using the input_df as a koalas dataframe. May someone please help me resolve it?

Thank you to all!


Solution

  • Looks like your filtering method is using __iter__() behind the scenes, which is currently not supported in Koalas.

    I suggest an alternative approach in which you define a custom function and pass your dataframe to it. This way, you should obtain the same results as with pandas code. A detailed explanation of the function is written line by line.

    def my_func(df):
      
      # be sure to create a column with unique identifiers
      df = df.reset_index(drop=True).reset_index()
      
      # create dataframe to be removed
      # the additional dummy column is needed to correctly filter out rows later on
      to_remove_df = df.drop_duplicates(['team_code', 'TeamName'])[['index']]
      to_remove_df = to_remove_df.assign(check = lambda x: 'remove_me')
      
      # merge the two dataframes and remove rows
      merged_df = df.merge(to_remove_df, on='index', how='outer')
      result = merged_df.loc[merged_df['check'].isna()]
      
      # drop unnecessary columns
      result = result.drop(columns=['index', 'check'])
      
      return result
    

    Example 1

    # your data
    data = {'team_code':['A1', 'S1'],
            'TeamName':['JohnTeam', 'SusanTeam']}
    input_df = ks.DataFrame(data)
    
    
    df = my_func(input_df)
    print(df)
    # Empty DataFrame
    # Columns: [team_code, TeamName]
    # Index: []
    

    Example 2

    # other sample data
    data = {'team_code':['A1', 'S1', 'A1', 'S1'],
            'TeamName':['JohnTeam', 'SusanTeam', 'RickTeam', 'SusanTeam']}
    input_df = ks.DataFrame(data)
    
    
    df = my_func(input_df)
    print(df)
    #   team_code   TeamName
    # 3        S1  SusanTeam