Search code examples
pysparkrdd

Way to merge RDD map result columns in same dataframe


I am doing one transformation in pyspark code and creating new columns. I have observed source_df is getting replaced with new columns. Is it possible to merge new columns with existing dataframe columns?

source_df=source_df.rdd.map(lambda x:winner_calc(x["org_attributes_dict"])).toDF()

Output 
+---------+----------+------------+
|winner_bn|winner_hj|winner_value|
+---------+----------+------------+
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
+---------+----------+------------+

Sharing Sample code as cannot share actual code. If you see in final result actual data frame is getting override with New value 'H' for all rows. I want to add that as new column in existing dataframe instead of overwriting it.

import sys,os
import concurrent.futures
from concurrent.futures import *
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.context import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql.functions import array
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import  udf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
#from pyspark.sql.functions import StringType
from pyspark.sql.functions import row_number,lit,col,expr
from pyspark.sql.window import Window
import requests
import json
import traceback
import base64
import pandas as pd 
import pyspark.sql.types as T
from pyspark.sql import functions as F


def val():
    return (tuple('H'))

###############################

class JobBase(object):
    spark=None
    def __start_spark_glue_context(self):
        conf = SparkConf().setAppName("python_thread")
        self.sc = SparkContext(conf=conf)
        self.glueContext = GlueContext(self.sc)
        self.spark = self.glueContext.spark_session
            
    def execute(self):
        self.__start_spark_glue_context()
        d =[{"account_number": 1, "v1": 100830, "v2": 1000},
                {"account_number": 2, "v1": 2000, "v2": 2},
                {"account_number": 3, "v1": 555, "v2": 55}]

        df = self.spark.createDataFrame(d)
        df.show()
        try:
            df=df.rdd.map(lambda x :val()).toDF()
        except Exception as exp:
            exception_type, exception_value, exception_traceback = sys.exc_info()
            traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
            err_msg = json.dumps({
                "errorType": exception_type.__name__,
                "errorMessage": str(exception_value),
                "stackTrace": traceback_string})
            print(err_msg)
        
        df.show()

        
def main():
    job = JobBase()
    job.execute() 
    

if __name__ == '__main__':
    main()

Output of main()

+--------------+------+----+
|account_number|    v1|  v2|
+--------------+------+----+
|             1|100830|1000|
|             2|  2000|   2|
|             3|   555|  55|
+--------------+------+----+

+---+
| _1|
+---+
|  H|
|  H|
|  H|
+---+

Solution

  • Replace the line

    df=df.rdd.map(lambda x :val()).toDF()
    

    with

    df = df.rdd.map(lambda row: row+val()).toDF(df.columns + ["v3"])
    

    Output:

    +--------------+------+----+---+
    |account_number|    v1|  v2| v3|
    +--------------+------+----+---+
    |             1|100830|1000|  H|
    |             2|  2000|   2|  H|
    |             3|   555|  55|  H|
    +--------------+------+----+---+