I am new to pyspark and so far it is a bit difficult to understand the way it works specially when you get use to libraries like pandas. But it is seems the way to go for big data.
For my current ETL job, I have the following elements:
This is my rdd:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
],
[
('SMSG', 'BKT'), ('SQNR', '00000024'), ('STNQ', '06'), ('TRNN', '000002'), ('NRID', ' '), ('TREC', '020'), ('TRNN', '000002'), ('NRID', ' '), ('TACN', '001'), ('CARF', ' '), ...
],
...
]
The row data is a fixed size text file.
what I want to do now is to groupByKey each cell of the list.
final result should be:
[
[
('SMSG_1', 'BKT'),('SMSG_2','BKS'),('SQNR_1', '00000004'),('SQNR_2', '00000005'),('STNQ_1','06'),('STNQ_2','24'),('TRNN', '000001'),()('DAIS', '171231'),...
],
[
('SMSG', 'BKT'),('SQNR', '00000024'),('STNQ','06'),('TRNN', '000002'),('NRID', ' '), ('TREC', '020'), ('TACN', '001'), ('CARF', ' '),...
],
...
]
Basically the rules are as following:
1- if the keys are same and the values are also same remove duplicates.
2- if the keys are same and the values different, rename columns and add a suffix as "_Number" where Number can be replaced by the iteration number of that key.
My code start as following:
def addBKT():
...
def prepareTrans():
...
if __name__ == '__main__':
input_folder = '/Users/admin/Documents/Training/FR20180101HOT'
rdd = sc.wholeTextFiles(input_folder).map(lambda x: x[1].split("BKT"))
rdd = rdd.flatMap(prepareTrans).map(addBKT).map(lambda x: x.split("\n")).map(hot_to_flat_file_v2)
print(rdd.take(1))
The print give me (as shared before) the following list of lists of tuples. I am taking only 1 sublist but the full rdd has about 2000 sublists of tuples:
[
[
('SMSG', 'BKT'), ('SQNR', '00000004'), ('STNQ', '06'), ('TRNN', '000001'), ('SMSG', 'BKS'), ('SQNR', '00000005'), ('STNQ', '24'), ('DAIS', '171231'), ('TRNN', '000001'), ....
]
]
I tried to reduce first the nested lists as following:
rdd = rdd.flatMap(lambda x:x).reduceByKey(list)
I was expecting as result a new list of lists without duplicates and for the tuples with different values, group them all under the same key. However, I am not able to do that.
As second step, I was planning to transform tuples with multiple values to new pairs of tuples as much as I got values in the grouped tuple: i.e. ('Key', ['Value1', 'Value2']) become ('Key_1', 'Value1'),('Key_2', 'Value2')
Finally, the output of all these transformations is to convert the final RDD to a DataFrame and store it in parquet format.
I really hope someone did something like that in the past. I took a lot of time to try doing this but I am not able to make it nor I was able to find any example online.
Thank you for your help.
As you are new to spark, You may not aware of Spark Dataframe. Dataframe is advanced concept compared to RDD. Here I solved your problem using Pyspark Dataframe. Have a look into this, Dont hesitate to learn spark Dataframe.
rdd1 = sc.parallelize([("SMSG", "BKT"), ("SMSG", "BKT"), ("SMSG", "BKS"), ('SQNR', '00000004'), ('SQNR', '00000005') ])
rddToDF = rdd1.toDF(["C1", "C2"])
+----+--------+
| C1| C2|
+----+--------+
|SMSG| BKT|
|SMSG| BKT|
|SMSG| BKS|
|SQNR|00000004|
|SQNR|00000005|
+----+--------+
DfRmDup = rddToDF.drop_duplicates() #Removing duplicates from Dataframe
DfRmDup.show()
+----+--------+
| C1| C2|
+----+--------+
|SQNR|00000004|
|SMSG| BKT|
|SQNR|00000005|
|SMSG| BKS|
+----+--------+
rank = DfRmDup.withColumn("rank", dense_rank().over(Window.partitionBy("C1").orderBy(asc("C2"))))
rank.show()
+----+--------+----+
| C1| C2|rank|
+----+--------+----+
|SQNR|00000004| 1|
|SQNR|00000005| 2|
|SMSG| BKS| 1|
|SMSG| BKT| 2|
+----+--------+----+
rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").show()
+------+--------+
| C1| C2|
+------+--------+
|SQNR_1|00000004|
|SQNR_2|00000005|
|SMSG_1| BKS|
|SMSG_2| BKT|
+------+--------+
#Converting back to RDD
rank.withColumn("C1", concat(col("C1"), lit("_"), col("rank"))).drop("rank").rdd.map(lambda x: (x[0],x[1])).collect()
[('SQNR_1', '00000004'),
('SQNR_2', '00000005'),
('SMSG_1', 'BKS'),
('SMSG_2', 'BKT')]