Search code examples
pandaspysparkspark-koalas

How change the value in a koalas dataframe based in a condition


I am using Koalas and I want to change the value of a column based on a condition.

In pandas I can do that using:

import pandas as pd
df_test = pd.DataFrame({
    'a': [1,2,3]
    ,'b': ['one','two','three']})

df_test2 =   pd.DataFrame({
    'c': [2,1,3]
    ,'d': ['one','two','three']})


df_test.loc[df_test.a.isin(df_test2['c']),'b'] = 'four'

df_test.head()

    a   b
0   1   four
1   2   four
2   3   four


I am trying to use the same in Koalas, but I have this error:

---------------------------------------------------------------------------
PandasNotImplementedError                 Traceback (most recent call last)
<ipython-input-15-814219258adb> in <module>
      5 new_loans['write_offs'] = 0
      6 
----> 7 new_loans.loc[(new_loans['ID'].isin(userinput_write_offs['id'])),'write_offs'] = 1
      8 new_loans.loc[new_loans['write_offs']==1,'is_active'] = 0
      9 new_loans = new_loans.sort_values(by = ['ZOHOID','Disb Date'])

/usr/local/lib/python3.7/dist-packages/databricks/koalas/base.py in isin(self, values)
    894             )
    895 
--> 896         return self._with_new_scol(self.spark.column.isin(list(values)))
    897 
    898     def isnull(self) -> Union["Series", "Index"]:

/usr/local/lib/python3.7/dist-packages/databricks/koalas/series.py in __iter__(self)
   5871 
   5872     def __iter__(self):
-> 5873         return MissingPandasLikeSeries.__iter__(self)
   5874 
   5875     if sys.version_info >= (3, 7):

/usr/local/lib/python3.7/dist-packages/databricks/koalas/missing/__init__.py in unsupported_function(*args, **kwargs)
     21     def unsupported_function(*args, **kwargs):
     22         raise PandasNotImplementedError(
---> 23             class_name=class_name, method_name=method_name, reason=reason
     24         )
     25 

PandasNotImplementedError: The method `pd.Series.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

How could I do the same operation in Koalas?

UPDATE

Following this question: Assign Koalas Column from Numpy Result I have done:

df_test.loc[df_test.a.isin(df_test2['c'].to_list()),'b'] = 'four'

But now I have this error:

---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
/usr/local/lib/python3.7/dist-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

/usr/local/lib/python3.7/dist-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

/usr/local/lib/python3.7/dist-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in __repr__(self)
  10614             return self._to_internal_pandas().to_string()
  10615 
> 10616         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  10617         pdf_length = len(pdf)
  10618         pdf = pdf.iloc[:max_display_count]

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  10606     def _get_or_create_repr_pandas_cache(self, n):
  10607         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
> 10608             self._repr_pandas_cache = {n: self.head(n + 1)._to_internal_pandas()}
  10609         return self._repr_pandas_cache[n]
  10610 

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in _to_internal_pandas(self)
  10602         This method is for internal use only.
  10603         """
> 10604         return self._internal.to_pandas_frame
  10605 
  10606     def _get_or_create_repr_pandas_cache(self, n):

/usr/local/lib/python3.7/dist-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    514     def wrapped_lazy_property(self):
    515         if not hasattr(self, attr_name):
--> 516             setattr(self, attr_name, fn(self))
    517         return getattr(self, attr_name)
    518 

/usr/local/lib/python3.7/dist-packages/databricks/koalas/internal.py in to_pandas_frame(self)
    807         """ Return as pandas DataFrame. """
    808         sdf = self.to_internal_spark_frame
--> 809         pdf = sdf.toPandas()
    810         if len(pdf) == 0 and len(sdf.schema) > 0:
    811             pdf = pdf.astype(

/usr/local/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    136 
    137         # Below is toPandas without Arrow optimization.
--> 138         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    139         column_counter = Counter(self.columns)
    140 

/usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
    594         """
    595         with SCCallSiteSync(self._sc) as css:
--> 596             sock_info = self._jdf.collectToPython()
    597         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    598 

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 589, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 1110, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'pandas'

Why is trying to use pandas?


Solution

  • Koalas package exposes Pandas Like APIs on high level for the users but under the hood implementation is done using PySpark APIs.

    I observed that within the stack track log you have pasted, a pandas dataframe is being created from sdf spark Dataframe using toPandas() method and assigned to pdf.

    In the implementation of toPandas() function, pandas and numpy are being imported.

    check line numbers 809 & 138.

    /usr/local/lib/python3.7/dist-packages/databricks/koalas/internal.py in to_pandas_frame(self)
        807         """ Return as pandas DataFrame. """
        808         sdf = self.to_internal_spark_frame
    --> 809         pdf = sdf.toPandas()
        810         if len(pdf) == 0 and len(sdf.schema) > 0:
        811             pdf = pdf.astype(
    
    /usr/local/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
        136 
        137         # Below is toPandas without Arrow optimization.
    --> 138         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
        139         column_counter = Counter(self.columns)
        140 
    
    /usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
        594         """
        595         with SCCallSiteSync(self._sc) as css:
    --> 596             sock_info = self._jdf.collectToPython()
        597         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
        598 
    

    you can check out the implementation of toPandas() function at the following link: https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py