Search code examples
apache-sparkpysparkamazon-emr

PySpark textFile replace text


The following is a few rows from an example file which is ~ 30GB

### s3://mybucket/tmp/file_in.txt
"one"|"mike"|"456"|"2010-01-04"
"two"|"lisa"|"789"|"2011-03-08"
"three"|"ann"|"845"|"2012-06-11"

I'd like to use PySpark to...

  • read the text file using spark's parallelism
  • replace the "n" character with "X"
  • output the updated text to a new text file with the same format

so the resulting file would look like this:

### s3://mybucket/tmp/file_out.txt
"oXe"|"mike"|"456"|"2010-01-04"
"two"|"lisa"|"789"|"2011-03-08"
"three"|"aXX"|"845"|"2012-06-11"

I have tried a variety of ways to achieve this seemingly simple task...

data = sc.textFile('s3://mybucket/tmp/file_in.txt')

def make_replacement(row):
    result = row.replace("n", "X")
    return result

out_data = data.map(make_replacement).collect()

#out_data = data.map(lambda line: make_replacement(line)).collect()

out_data.coalesce(1).write.format("text").option("header", "false").save("s3://mybucket/tmp/file_out.txt")

but I continue to see the following errors:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 21, <<my_server>>, executor 9): java.lang.RuntimeException: Failed to run command: /usr/bin/virtualenv -p python3 --system-site-packages virtualenv_application....
    at org.apache.spark.api.python.VirtualEnvFactory.execCommand(VirtualEnvFactory.scala:120)

Note: solutions using read.csv or dataframe will not be applicable to this problem
Any recommendations on how to solve this?


Solution

  • You can create an expression and call the expression in select

    from pyspark.sql import functions as F
    
    df = spark.read.csv('s3://mybucket/tmp/file_in.txt','\t')
    expr = [F.regexp_replace(F.col(column), pattern="n", replacement="X").alias(column) for column in df.columns]
    
    df = df.select(expr)
    df.write.csv.format("text").option("header", "false").save("s3://mybucket/tmp/file_out.txt")