My goal is to call an external (dotnet) process from pyspark via RDD.pipe. As this failed I wanted to test to pipe to a simple command:
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
result_rdd = spark.sparkContext.parallelize(['1', '2', '', '3']).pipe(command).collect()
However, I get the error message:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) ( executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\projectpath\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 686, in main
File "C:\projectpath\.venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 676, in process
File "C:\projectpath\.venv\lib\site-packages\pyspark\rdd.py", line 540, in func
return f(iterator)
File "C:\projectpath\.venv\lib\site-packages\pyspark\rdd.py", line 1117, in func
pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
File "C:\Users\username\AppData\Local\Programs\Python\Python39\lib\subprocess.py", line 951, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "C:\Users\username\AppData\Local\Programs\Python\Python39\lib\subprocess.py", line 1420, in _execute_child
hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
FileNotFoundError: [WinError 2] The system cannot find the file specified
Update: I found a workaround to make it work for me. I had a look at the pyspark implementation of the pipe function and if no env parameter is given, they use an empty dictionary as env parameter for Popen, which led to the same error for me when doing it directly for Popen. Just adding some dictionary with values fixed the problem for now:
pipe(command, env={"1":"2"})