Search code examples
csvdataframepysparkrdd

adding a unique consecutive row number to dataframe in pyspark


I want to add the unique row number to my dataframe in pyspark and dont want to use monotonicallyIncreasingId & partitionBy methods. I think that this question might be a duplicate of similar questions asked earlier, still looking for some advice whether I am doing it right way or not. following is snippet of my code: I have a csv file with below set of input records:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

and I have loaded this csv file into a dataframe.

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

when I included the int value to my list, I have lost the dataframe schema.

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

Am I doing it right way? or is there any better way to add or preserve the schema of dataframe in pyspark?

Is it feasible to use zipWithIndex method to add unique consecutive row number for large size dataframe also? Can we use this row_id to re-partition the dataframe to uniformly distribute the data across the partitions?


Solution

  • I have found a solution and it's very simple. since I have no column in my dataframe which is having same value across all the rows, so using row_number is not generating unique row numbers when using it with partitionBy clause.

    Lets add a new column to the existing dataframe with some default value in it.

    emp_df= emp_df.withColumn("new_column",lit("ABC"))
    

    and create a window function with paritionBy using that column "new_column"

    w = Window().partitionBy('new_column').orderBy(lit('A'))
    df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")
    

    you will get the desired results:

    +------+--------------------+--------+----------+-------+
    |emp_id|            emp_name|emp_city|emp_salary|row_num|
    +------+--------------------+--------+----------+-------+
    |     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
    |     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
    |     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
    |     9|ROBERT           ...|GURGAON |     70000|      4|
    |     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
    |     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
    |     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
    |     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
    |     6|RAJAT TYAGI      ...|UP      |     65000|      9|
    +------+--------------------+--------+----------+-------+