Search code examples
apache-sparkhadooprddhadoop-partitioning

Spark RDD: partitioning according to text file format


I have a text file containing tens of GBs of data, which I need to load from HDFS and parallelize as an RDD. This text file describes items with the following format. Note that the alphabetic strings are not present (the meaning of each row is implicit) and that each row can contain whitespaces to separate different values:

0001  (id)
1000 1000 2000 (dimensions)
0100           (weight)
0030           (amount)
0002  (id)
1110 1000 5000 (dimensions)
0220           (weight)
3030           (amount)

I reckon that the most inmediate approach to parallelize this file would be to upload it to the HDFS from the local filesystem and then to create an RDD by executing sc.textFile(filepath). However, in this case, the partitioning would depend on the HDFS splits corresponding to the file.

The problem with said approach is that each partition can contain incomplete items. For example:

Partition 1

0001           (id)
1000 1000 2000 (dimensions)
0100           (weight)
0030           (amount)
0002           (id)
1110 1000 5000 (dimensions)

Partition 2

0220           (weight)
3030           (amount)

Thus, when we call a method for each partition and pass its corresponding data block to it, it will be receiving an incomplete specification for the item identified as 0002. This will result in a wrong output for the calculations performed inside the called method.

Which would be the most efficient way to partition or repartition this RDD to avoid this problem? Can specify the number of lines of each partition to be multiple of 4? If so, should it be done by Hadoop or Spark?


Solution

  • Load the text file to obtain RDD[String] then use zipWithIndex for converting to RDD[(String, Long)] where the second attribute in tuple is the index number of the element in the RDD.

    Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.

    • Using the index as line number (starting from 0) we can group lines belonging to a record. eg. [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
    • Since we know that each record spans (exactly) 4 lines, integer division of the index by 4 (lets call this idx_div). This will result in first four lines having 0 as idx_div the next four lines will get 1 as idx_div and so on. eg. [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, .... This can be used to group all (four) lines belonging to one record for further parsing and processing


    case class Record(id:String, dimensions:String, weight:String, amount:String)
    val lines = sc.textFile("...")
    val records = lines
        .zipWithIndex
        .groupBy(line_with_idx => (line_with_idx._2 / 4))  // groupBy idx_div
        .map(grouped_record => {
            val (idx_div:Long, lines_with_idx:Iterable[(String, Long)]) = grouped_record
            val lines_with_idx_list = lines_with_idx.toList.sortBy(_._2)  // Additional check to ensure ordering
            val lines_list = lines_with_idx_list.map(_._1)
            val List(id:String, dimensions:String, weight:String, amount:String) = lines_list
            new Record(id, dimensions, weight, amount)
        })