Search code examples
apache-sparkpysparkrdd

Saving and Loading wholeTextFiles using Spark RDD


I need to perform batch processing of some text files in spark. Basically someone gave me tons of csv files that are misshapen. They contain many lines of header data that is in arbitrary text format, then many lines of properly formatted csv data. I need to split this data into two files, or at least get rid of the header somehow.

Anyway, I have read that you can get an RDD with the format:

[(filename, content)]

by using

spark \ .sparkContext \ .wholeTextFiles(input_files_csv)

I would then like to perform a map operation on this RDD which results in another format exactly like the original

[(newfilename, content)]

I would then like the cluster to save these contents under these filenames.

I've not been able to find the write command that will do this for me. I can save the RDD raw, but I can't save it as normal files that I can then later read as dataframes.

I suppose I could remove the headers, then save as one single giant csv with the filename as a new column, but I feel like that would not be not as effective.

Does anyone have a solution to my problem?


Solution

  • This is Scala, but it shouldn't be very far in Python. Inside a "foreach" I'm not using anything spark-specific to write the files, just regular Hadoop APIs.

    sc.wholeTextFiles("/tmp/test-data/")
      .foreach{ x =>
        val filename = x._1
        val content = x._2
        val fs = FileSystem.get(new Configuration())
        val output = fs.create(new Path(s"${filename}-copy"))
        val writer = new PrintWriter(output)
        writer.write(content)
        writer.close
      }