Search code examples
scalaapache-spark

Create a new table by joining two tables having common column names with matching data type


I need to create a new table "c" from table "a" and "b". The columns for table "c" should be the columns which are common between table "a" and "b" and have matching data type as well.

Suppose the table structure is as below -

Table a Table b
col1 : int col1 : int
col2 : string col2 : string
col3 : string col3 : int
col4 : string col5 : string

The output should have below columns -

Table c
col1 : int
col2 : string

From data POV, the input and output should look like this - TABLE A

col1 col2 col3 col4
5 ABC DEF GHI

TABLE B

col1 col2 col3 col5
5 ABC 6 GHI

TABLE C or output table

col1 col2
5 ABC

The Pseudo code I am working with is -

val df1Cols = df1.columns.toSet
val df2Cols = df2.columns.toSet
val commonCols = df1Cols.intersect(df2Cols)

val commonColExtract = df1.schema.filter(field => commonCols.contains(field.name)).map(field => field.name).toList()

Output dataframe :
val df = df1.join(df2, commonColExtract, "inner")

Solution

  • To derive commonColExtract, you need to compare both the column names as well as the data type of the column.

    val df1Cols = df1.schema.map(c => (c.name, c.dataType.simpleString))
    //df1Cols: Seq[(String, String)] = List((col1,int), (col2,string), (col3,string), (col4,string))
    
    val df2Cols = df2.schema.map(c => (c.name, c.dataType.simpleString))
    //df2Cols: Seq[(String, String)] = List((col1,int), (col2,string), (col3,int), (col5,string))
    
    val commonColExtract = df1Cols.intersect(df2Cols).map(_._1)
    //commonColExtract: Seq[String] = List(col1, col2)
    

    Then you can join with commonColExtract:

    df1.join(df2, commonColExtract, "inner").select(commonColExtract.head, commonColExtract.tail: _*).show
    
    //+----+----+
    //|col1|col2|
    //+----+----+
    //|   5| ABC|
    //+----+----+
    

    You can alias df1 and df2 if you want to select columns from a specific dataframe.