I am using pandas-on-spark in combination with regex to remove some abbreviations from a column in a dataframe. In pandas this all works fine, but I have the task to migrate this code to a production workload on our spark cluster, and therefore decided to use pandas-on-spark. However, I am running into a weird error. I'm using the following function to clean up the abbreviations (Somewhat simplified here for readability purposes, in reality abbreviations_dict has 61 abbreviations and patterns is a list with three regex patterns).
import pyspark.pandas as pspd
def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
"""
The job titles contain a lot of abbreviations for common terms.
We write them out to create a more standardized job title list.
:param job_list: df.SchoneFunctie during processing steps
:return: SchoneFunctie where abbreviations are written out in words
"""
abbreviations_dict = {
"1e": "eerste",
"1ste": "eerste",
"2e": "tweede",
"2de": "tweede",
"3e": "derde",
"3de": "derde",
"ceo": "chief executive officer",
"cfo": "chief financial officer",
"coo": "chief operating officer",
"cto": "chief technology officer",
"sr": "senior",
"tech": "technisch",
"zw": "zelfstandig werkend"
}
#Create a list of abbreviations
abbreviations_pob = list(abbreviations_dict.keys())
#For each abbreviation in this list
for abb in abbreviations_pob:
# define patterns to look for
patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
fr'{abb}\.']
# actual recoding of abbreviations to written out form
value_to_replace = abbreviations_dict[abb]
for patt in patterns:
job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)
return job_list
When I then call the function with a pspd Series, and perform an action so the query plan is executed:
df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))
it throws a java.lang.StackOverflowError. The stack trace is too long to paste here, I pasted a subset of it since it is a repeating one.
23/05/05 09:53:14 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (PC ID executor driver): java.lang.StackOverflowError
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2408)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
It goes on like this for quite a while, untill I get:
23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
self.process_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
self.finish_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
self.handle()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
poll(accum_updates)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
if func():
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
num_updates = read_int(self.rfile)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
length = stream.read(4)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>
Some things i've tried / facts I think could be relevant:
pspd.Dataframe
makes the
.apply(lambda ...)
fail inside the resolve_abbreviations function.Any help would be greatly appreciated. Perhaps I am better off avoiding the pandas-on-spark API, and transform the code to regular pyspark as the pandas-on-spark API apparently isn't mature enough yet to run pandas scripts "as is"? Or perhaps our code design is flawed by nature and there is another efficient way to achieve similar results?
Is it possible that your input data is deeply nested? This could contribute to the looping stack calls you can see in there.
The first thing I would try is running with a larger stack size than you're doing now. I'm not sure what OS/java version you're running this on, so can't know what the default stack size is on your machine. Typically, though, it ranges in the order of magnitude of 100KB - 1024KB.
Try running it with a stack size of 4MB. Inside of the JVM, this is done with the Xss
parameter. You'll want to do this on the driver, with the spark.driver.extraJavaOptions
config parameter. Something like this:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("whateverMasterYouHave")
.setAppName("MyApp")
.set("spark.driver.extraJavaOptions", "-Xss4M"))
sc = SparkContext.getOrCreate(conf = conf)