I am having a small issue which I am facing in my code logic.
I am converting a line of code which uses pandas dataframe to use Koalas dataframe and I get the following error during the code execution.
# Error Message
PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.
#Sample dataframe
# initialize data of lists.
data = {'team_code':['A1', 'S1'],
'TeamName':['JohnTeam', 'SusanTeam']}
# Create DataFrame
input_df= pd.DataFrame(data)
The original line of code using the pandas dataframe is shown below:
# input_df is a pandas df here
to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])
dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
I converted the code above using a dataframe as shown below.
The original line of code using the pandas dataframe is shown below. The only difference is that input_df is now a koalas dataframe:
# input_df is now a koalas df here
to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])
dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
Stacktrace on a databricks cluster with Spark 3.2.0 and koalas==1.8.2
---------------------------------------------------------------------------
PandasNotImplementedError Traceback (most recent call last)
<command-2399235872097642> in <module>
2 to_remove_df = input_df.drop_duplicates(['team_code', 'TeamName'])
3 print(to_remove_df)
----> 4 dropped_df = input_df[~input_df.index.isin(to_remove_df.index)].copy().reset_index(drop=True)
5 dropped_df
/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
193 start = time.perf_counter()
194 try:
--> 195 res = func(*args, **kwargs)
196 logger.log_success(
197 class_name, function_name, time.perf_counter() - start, signature
/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
188 if hasattr(_local, "logging") and _local.logging:
189 # no need to log since this should be internal call.
--> 190 return func(*args, **kwargs)
191 _local.logging = True
192 try:
/databricks/python/lib/python3.8/site-packages/databricks/koalas/base.py in isin(self, values)
1234 )
1235
-> 1236 values = values.tolist() if isinstance(values, np.ndarray) else list(values)
1237 return self._with_new_scol(self.spark.column.isin(values))
1238
/databricks/python/lib/python3.8/site-packages/databricks/koalas/indexes/base.py in __iter__(self)
2479
2480 def __iter__(self):
-> 2481 return MissingPandasLikeIndex.__iter__(self)
2482
2483 def __xor__(self, other):
/databricks/python/lib/python3.8/site-packages/databricks/koalas/usage_logging/__init__.py in wrapper(*args, **kwargs)
248 def wrapper(*args, **kwargs):
249 try:
--> 250 return func(*args, **kwargs)
251 finally:
252 logger.log_missing(class_name, function_name, is_deprecated, signature)
/databricks/python/lib/python3.8/site-packages/databricks/koalas/missing/__init__.py in unsupported_function(*args, **kwargs)
20 def unsupported_function(class_name, method_name, deprecated=False, reason=""):
21 def unsupported_function(*args, **kwargs):
---> 22 raise PandasNotImplementedError(
23 class_name=class_name, method_name=method_name, reason=reason
24 )
PandasNotImplementedError: The method `pd.Index.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.
I am not able to figure out what I missed in the code above when using the input_df
as a koalas dataframe. May someone please help me resolve it?
Thank you to all!
Looks like your filtering method is using __iter__()
behind the scenes, which is currently not supported in Koalas.
I suggest an alternative approach in which you define a custom function and pass your dataframe to it. This way, you should obtain the same results as with pandas code. A detailed explanation of the function is written line by line.
def my_func(df):
# be sure to create a column with unique identifiers
df = df.reset_index(drop=True).reset_index()
# create dataframe to be removed
# the additional dummy column is needed to correctly filter out rows later on
to_remove_df = df.drop_duplicates(['team_code', 'TeamName'])[['index']]
to_remove_df = to_remove_df.assign(check = lambda x: 'remove_me')
# merge the two dataframes and remove rows
merged_df = df.merge(to_remove_df, on='index', how='outer')
result = merged_df.loc[merged_df['check'].isna()]
# drop unnecessary columns
result = result.drop(columns=['index', 'check'])
return result
# your data
data = {'team_code':['A1', 'S1'],
'TeamName':['JohnTeam', 'SusanTeam']}
input_df = ks.DataFrame(data)
df = my_func(input_df)
print(df)
# Empty DataFrame
# Columns: [team_code, TeamName]
# Index: []
# other sample data
data = {'team_code':['A1', 'S1', 'A1', 'S1'],
'TeamName':['JohnTeam', 'SusanTeam', 'RickTeam', 'SusanTeam']}
input_df = ks.DataFrame(data)
df = my_func(input_df)
print(df)
# team_code TeamName
# 3 S1 SusanTeam