Search code examples
pythonpandasapache-sparkpysparkpyspark-pandas

Pandas-on-spark throwing java.lang.StackOverFlowError


I am using pandas-on-spark in combination with regex to remove some abbreviations from a column in a dataframe. In pandas this all works fine, but I have the task to migrate this code to a production workload on our spark cluster, and therefore decided to use pandas-on-spark. However, I am running into a weird error. I'm using the following function to clean up the abbreviations (Somewhat simplified here for readability purposes, in reality abbreviations_dict has 61 abbreviations and patterns is a list with three regex patterns).

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

When I then call the function with a pspd Series, and perform an action so the query plan is executed:

df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))

it throws a java.lang.StackOverflowError. The stack trace is too long to paste here, I pasted a subset of it since it is a repeating one.

23/05/05 09:53:14 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (PC ID executor driver): java.lang.StackOverflowError
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2408)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)

It goes on like this for quite a while, untill I get:

23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
    self.handle()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
    poll(accum_updates)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
    if func():
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
    length = stream.read(4)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>

Some things i've tried / facts I think could be relevant:

  • For now I am trying to run this locally. I am running it locally on a subset of 5000 rows of data, so that shouldn't be the problem. Perhaps increasing some kind of default config could still help.
  • I think this has to do with the lazy evaluation in spark, and the DAG of spark getting too big because of the for-loops in the function. But I have no idea how to solve the problem. As per pyspark-on-pandas best practices documentation I have tried to implement checkpointing, but this is not available for pspd.Series, and converting my Series into a pspd.Dataframe makes the .apply(lambda ...) fail inside the resolve_abbreviations function.

Any help would be greatly appreciated. Perhaps I am better off avoiding the pandas-on-spark API, and transform the code to regular pyspark as the pandas-on-spark API apparently isn't mature enough yet to run pandas scripts "as is"? Or perhaps our code design is flawed by nature and there is another efficient way to achieve similar results?


Solution

  • Is it possible that your input data is deeply nested? This could contribute to the looping stack calls you can see in there.

    The first thing I would try is running with a larger stack size than you're doing now. I'm not sure what OS/java version you're running this on, so can't know what the default stack size is on your machine. Typically, though, it ranges in the order of magnitude of 100KB - 1024KB.

    Try running it with a stack size of 4MB. Inside of the JVM, this is done with the Xss parameter. You'll want to do this on the driver, with the spark.driver.extraJavaOptions config parameter. Something like this:

    from pyspark import SparkConf, SparkContext
    conf = (SparkConf()
         .setMaster("whateverMasterYouHave")
         .setAppName("MyApp")
         .set("spark.driver.extraJavaOptions", "-Xss4M"))
    sc = SparkContext.getOrCreate(conf = conf)