I am doing one transformation in pyspark code and creating new columns. I have observed source_df
is getting replaced with new columns. Is it possible to merge new columns with existing dataframe columns?
source_df=source_df.rdd.map(lambda x:winner_calc(x["org_attributes_dict"])).toDF()
Output
+---------+----------+------------+
|winner_bn|winner_hj|winner_value|
+---------+----------+------------+
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
| No Match| 2| 1|
+---------+----------+------------+
Sharing Sample code as cannot share actual code. If you see in final result actual data frame is getting override with New value 'H' for all rows. I want to add that as new column in existing dataframe instead of overwriting it.
import sys,os
import concurrent.futures
from concurrent.futures import *
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.context import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql.functions import array
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
#from pyspark.sql.functions import StringType
from pyspark.sql.functions import row_number,lit,col,expr
from pyspark.sql.window import Window
import requests
import json
import traceback
import base64
import pandas as pd
import pyspark.sql.types as T
from pyspark.sql import functions as F
def val():
return (tuple('H'))
###############################
class JobBase(object):
spark=None
def __start_spark_glue_context(self):
conf = SparkConf().setAppName("python_thread")
self.sc = SparkContext(conf=conf)
self.glueContext = GlueContext(self.sc)
self.spark = self.glueContext.spark_session
def execute(self):
self.__start_spark_glue_context()
d =[{"account_number": 1, "v1": 100830, "v2": 1000},
{"account_number": 2, "v1": 2000, "v2": 2},
{"account_number": 3, "v1": 555, "v2": 55}]
df = self.spark.createDataFrame(d)
df.show()
try:
df=df.rdd.map(lambda x :val()).toDF()
except Exception as exp:
exception_type, exception_value, exception_traceback = sys.exc_info()
traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
err_msg = json.dumps({
"errorType": exception_type.__name__,
"errorMessage": str(exception_value),
"stackTrace": traceback_string})
print(err_msg)
df.show()
def main():
job = JobBase()
job.execute()
if __name__ == '__main__':
main()
Output of main()
+--------------+------+----+
|account_number| v1| v2|
+--------------+------+----+
| 1|100830|1000|
| 2| 2000| 2|
| 3| 555| 55|
+--------------+------+----+
+---+
| _1|
+---+
| H|
| H|
| H|
+---+
Replace the line
df=df.rdd.map(lambda x :val()).toDF()
with
df = df.rdd.map(lambda row: row+val()).toDF(df.columns + ["v3"])
Output:
+--------------+------+----+---+
|account_number| v1| v2| v3|
+--------------+------+----+---+
| 1|100830|1000| H|
| 2| 2000| 2| H|
| 3| 555| 55| H|
+--------------+------+----+---+