The following is a few rows from an example file which is ~ 30GB
### s3://mybucket/tmp/file_in.txt
"one"|"mike"|"456"|"2010-01-04"
"two"|"lisa"|"789"|"2011-03-08"
"three"|"ann"|"845"|"2012-06-11"
I'd like to use PySpark to...
so the resulting file would look like this:
### s3://mybucket/tmp/file_out.txt
"oXe"|"mike"|"456"|"2010-01-04"
"two"|"lisa"|"789"|"2011-03-08"
"three"|"aXX"|"845"|"2012-06-11"
I have tried a variety of ways to achieve this seemingly simple task...
data = sc.textFile('s3://mybucket/tmp/file_in.txt')
def make_replacement(row):
result = row.replace("n", "X")
return result
out_data = data.map(make_replacement).collect()
#out_data = data.map(lambda line: make_replacement(line)).collect()
out_data.coalesce(1).write.format("text").option("header", "false").save("s3://mybucket/tmp/file_out.txt")
but I continue to see the following errors:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 21, <<my_server>>, executor 9): java.lang.RuntimeException: Failed to run command: /usr/bin/virtualenv -p python3 --system-site-packages virtualenv_application....
at org.apache.spark.api.python.VirtualEnvFactory.execCommand(VirtualEnvFactory.scala:120)
Note: solutions using read.csv
or dataframe will not be applicable to this problem
Any recommendations on how to solve this?
You can create an expression and call the expression in select
from pyspark.sql import functions as F
df = spark.read.csv('s3://mybucket/tmp/file_in.txt','\t')
expr = [F.regexp_replace(F.col(column), pattern="n", replacement="X").alias(column) for column in df.columns]
df = df.select(expr)
df.write.csv.format("text").option("header", "false").save("s3://mybucket/tmp/file_out.txt")