Search code examples
dataframeapache-sparkapache-spark-sqlspark-java

How to union two dataframes which have same number of columns?


Dataframe df1 contains columns : a, b, c, d, e (Empty dataframe)

Dataframe df2 contains columns : b, c, d, e, _c4 (Contains Data)

I want to do a union on these two dataframes. I tried using

df1.union(df2);

This populates data with position. but i want to populated data with name of the columns.

Then I tried with

df1.unionByName(df2, allowMissingColumns= true);

But it throws the error in ``allowMissingColumns= true`. I get to know the error this is because of the version. I use spark version 2.4.4.

df1:

|a|b|c|d|e|
+---------+
| | | | | | 
+---------+

df2:

|b|c|d|e|_c4|
+-----------+
|2|3|5|6|   | 
+-----------+

Expected Output:

|a|b|c|d|e|
+---------+
| |2|3|5|6| 
+---------+

My question is there any other way to override an empty dataframe (df1) with populated dataframe (df2) using the column names? or should i need to change the version in pom.xml file? Kindly pour some suggestions.

Pom file:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>rule</groupId>
  <artifactId>qwerty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>qwerty</name>
  <description>code</description>
  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

        
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>4.0.0</version>
        </dependency>

   </dependencies>
   <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <outputDirectory>${project.build.directory}</outputDirectory>
                    <archive>
                        <manifest>
                            <mainClass>qwerty.qwerty</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin> 
                <artifactId>maven-compiler-plugin</artifactId> 
                <configuration> <source>1.8</source> <target>1.8</target> </configuration> 
            </plugin>
        </plugins>
    </build>
</project>

Solution

  • unionByName exists since spark 2.3 but the allowMissingColumns only appeared in spark 3.1, hence the error you obtain in 2.4.

    In spark 2.4, you could try to implement the same behavior yourself. That is, transforming df2 so that it contains all the columns from df1. If a column is not in df2, we can set it to null. In scala, you could do it this way:

    val df2_as1 = df2
        .select(df1
            .columns
            .map(c => if(df2.columns.contains(c)) col(c) else lit(null).as(c))
        : _*)
    // Here, union would work just as well.
    val result = df1.unionByName(df2_as1)
    

    In java, that's obviously much more painful:

    List<String> df2_cols = Arrays.asList(df2.columns());
    // cols is the list of columns contained in df1, but all columns
    // that are not in df2 are set to null.
    List<Column> cols = new ArrayList<>();
    for (String c : df1.columns()) {
        if(df2_cols.contains(c))
              cols.add(functions.col(c));
        else
              cols.add(functions.lit(null).alias(c));
    }
    // We modify df2 so that its schema matches df1's.
    Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());
            
    // Here, union would work just as well.
    Dataset<Row> result = df1.unionByName(df2_as1);