I have a text file containing tens of GBs of data, which I need to load from HDFS and parallelize as an RDD. This text file describes items with the following format. Note that the alphabetic strings are not present (the meaning of each row is implicit) and that each row can contain whitespaces to separate different values:
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
0220 (weight)
3030 (amount)
I reckon that the most inmediate approach to parallelize this file would be to upload it to the HDFS from the local filesystem and then to create an RDD by executing sc.textFile(filepath)
. However, in this case, the partitioning would depend on the HDFS splits corresponding to the file.
The problem with said approach is that each partition can contain incomplete items. For example:
Partition 1
0001 (id)
1000 1000 2000 (dimensions)
0100 (weight)
0030 (amount)
0002 (id)
1110 1000 5000 (dimensions)
Partition 2
0220 (weight)
3030 (amount)
Thus, when we call a method for each partition and pass its corresponding data block to it, it will be receiving an incomplete specification for the item identified as 0002. This will result in a wrong output for the calculations performed inside the called method.
Which would be the most efficient way to partition or repartition this RDD to avoid this problem? Can specify the number of lines of each partition to be multiple of 4? If so, should it be done by Hadoop or Spark?
Load the text file to obtain RDD[String]
then use zipWithIndex for converting to RDD[(String, Long)]
where the second attribute in tuple is the index number of the element in the RDD.
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...
idx_div
the next four lines will get 1 as idx_div
and so on. eg. [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, ...
. This can be used to group all (four) lines belonging to one record for further parsing and processing case class Record(id:String, dimensions:String, weight:String, amount:String)
val lines = sc.textFile("...")
val records = lines
.zipWithIndex
.groupBy(line_with_idx => (line_with_idx._2 / 4)) // groupBy idx_div
.map(grouped_record => {
val (idx_div:Long, lines_with_idx:Iterable[(String, Long)]) = grouped_record
val lines_with_idx_list = lines_with_idx.toList.sortBy(_._2) // Additional check to ensure ordering
val lines_list = lines_with_idx_list.map(_._1)
val List(id:String, dimensions:String, weight:String, amount:String) = lines_list
new Record(id, dimensions, weight, amount)
})