javaapache-sparkhadoopapache-spark-sql

org.apache.hadoop.fs.ParentNotDirectoryException - is not a directory


I am trying to append new records into existing csv file using Spark SQL.

Getting Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory) as the input is a File and not a directory.

Spark (Java) code and exception stack trace mentioned below.

How to resolve this?

Environment Info - Mac OS

MyiCloud@MyMC someDir % uname -a
Darwin MyMC.local 21.6.0 Darwin Kernel Version 21.6.0: Fri Sep 15 16:17:23 PDT 2023; root:xnu-8020.240.18.703.5~1/RELEASE_X86_64 x86_64

Hadoop Environment Info

MyiCloud@MyMC someDir % hadoop version

Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r 1be78238728da9266a4f88195058f08fd012bf9c
Compiled by ubuntu on 2023-06-18T08:22Z
Compiled on platform linux-x86_64
Compiled with protoc 3.7.1
From source with checksum 5652179ad55f76cb287d9c633bb53bbd
This command was run using /usr/local/Cellar/hadoop/3.3.6/libexec/share/hadoop/common/hadoop-common-3.3.6.jar

File Permission - HDFS

MyiCloud@MyMC sbin % hdfs dfs -ls /tmp/spark/input/csv/
2023-11-20 01:35:51,752 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rwxr-xr-x   1 MyiCloud supergroup        268 2023-11-20 00:58 /tmp/spark/input/csv/orders.csv

Input file - orders.csv

firstName,lastName,state,quantity,revenue,timestamp
Jean Georges,Perrin,NC,1,300,1551903533
Jean Georges,Perrin,NC,2,120,1551903567
Jean Georges,Perrin,CA,4,75,1551903599
Holden,Karau,CA,6,37,1551904299
Ginni,Rometty,NY,7,91,1551916792
Holden,Karau,CA,4,153,1552876129

Apache Spark with Java Code

package org.example.ch11;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;

import java.util.Collections;

public class InsertApp {

    public static void main(String[] args) {
        InsertApp inApp = new InsertApp();
        inApp.append();
    }

    private void append() {
        SparkSession spark = SparkSession.builder()
                .appName("InsertProcessor")
                .master("local[*]").getOrCreate();
        Dataset<Row> df = spark.read().format("csv")
                .option("inferSchema", true)
                .option("header", true)
                .load("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
        System.out.println("Printing csv content..");
        df.show();

        df.createOrReplaceTempView("orders_view");
        try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
            Dataset<Row> newDf = spark.createDataFrame(
                    jsc.parallelize(
                            Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
                    ),
                    df.schema()
            );
            newDf.createOrReplaceTempView("new_order_view");

            System.out.println("Inserting data into orders_view view from new_order_view view...");
            Dataset<Row> mergedDf = spark.sql("INSERT INTO orders_view SELECT * FROM new_order_view");
            mergedDf.show();
            System.out.println("Completed...");
        }
    }
}

Exception - Stack Trace

Printing csv content..
23/11/20 01:38:10 INFO FileSourceStrategy: Pushed Filters: 
23/11/20 01:38:10 INFO FileSourceStrategy: Post-Scan Filters: 
23/11/20 01:38:10 INFO FileSourceStrategy: Output Data Schema: struct<firstName: string, lastName: string, state: string, quantity: int, revenue: int ... 1 more field>
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 22.283375 ms
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 302.5 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 27.3 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.0.6:49627 (size: 27.3 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 4 from show at InsertApp.java:27
23/11/20 01:38:10 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/20 01:38:10 INFO SparkContext: Starting job: show at InsertApp.java:27
23/11/20 01:38:10 INFO DAGScheduler: Got job 2 (show at InsertApp.java:27) with 1 output partitions
23/11/20 01:38:10 INFO DAGScheduler: Final stage: ResultStage 2 (show at InsertApp.java:27)
23/11/20 01:38:10 INFO DAGScheduler: Parents of final stage: List()
23/11/20 01:38:10 INFO DAGScheduler: Missing parents: List()
23/11/20 01:38:10 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27), which has no missing parents
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 14.4 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.8 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.0.6:49627 (size: 6.8 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1388
23/11/20 01:38:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27) (first 15 tasks are for partitions Vector(0))
23/11/20 01:38:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
23/11/20 01:38:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2) (192.168.0.6, executor driver, partition 0, NODE_LOCAL, 4877 bytes) taskResourceAssignments Map()
23/11/20 01:38:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
23/11/20 01:38:10 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.0.6:49627 in memory (size: 7.7 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO FileScanRDD: Reading File path: hdfs://localhost:8020/tmp/spark/input/csv/orders.csv, range: 0-268, partition values: [empty row]
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 19.008744 ms
23/11/20 01:38:10 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1789 bytes result sent to driver
23/11/20 01:38:10 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 115 ms on 192.168.0.6 (executor driver) (1/1)
23/11/20 01:38:10 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
23/11/20 01:38:10 INFO DAGScheduler: ResultStage 2 (show at InsertApp.java:27) finished in 0.141 s
23/11/20 01:38:10 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
23/11/20 01:38:10 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
23/11/20 01:38:10 INFO DAGScheduler: Job 2 finished: show at InsertApp.java:27, took 0.150319 s
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 40.368014 ms
+------------+--------+-----+--------+-------+----------+
|   firstName|lastName|state|quantity|revenue| timestamp|
+------------+--------+-----+--------+-------+----------+
|Jean Georges|  Perrin|   NC|       1|    300|1551903533|
|Jean Georges|  Perrin|   NC|       2|    120|1551903567|
|Jean Georges|  Perrin|   CA|       4|     75|1551903599|
|      Holden|   Karau|   CA|       6|     37|1551904299|
|       Ginni| Rometty|   NY|       7|     91|1551916792|
|      Holden|   Karau|   CA|       4|    153|1552876129|
+------------+--------+-----+--------+-------+----------+

Inserting data into orders_view view from new_order_view view...
23/11/20 01:38:10 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/11/20 01:38:10 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/11/20 01:38:10 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23/11/20 01:38:10 INFO SparkUI: Stopped Spark web UI at http://192.168.0.6:4040
23/11/20 01:38:10 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/20 01:38:10 INFO MemoryStore: MemoryStore cleared
23/11/20 01:38:10 INFO BlockManager: BlockManager stopped
23/11/20 01:38:10 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/20 01:38:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/20 01:38:10 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
    at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
    at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2426)
    at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2400)
    at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1324)
    at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1321)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1338)
    at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1313)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
    at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at org.example.ch11.InsertApp.append(InsertApp.java:40)
    at org.example.ch11.InsertApp.main(InsertApp.java:15)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.ParentNotDirectoryException): /tmp/spark/input/csv/orders.csv (is not a directory)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
    at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1511)
    at org.apache.hadoop.ipc.Client.call(Client.java:1457)
    at org.apache.hadoop.ipc.Client.call(Client.java:1367)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
    at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:656)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
    at com.sun.proxy.$Proxy19.mkdirs(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2424)
    ... 31 more
23/11/20 01:38:11 INFO ShutdownHookManager: Shutdown hook called
23/11/20 01:38:11 INFO ShutdownHookManager: Deleting directory /private/var/folders/6w/v2bcqfkd6kl5ylt9k5_q0lb80000gn/T/spark-cf226489-3921-439a-9653-b33159d3a230

Process finished with exit code 1

Solution

  • Update your logic to this:

    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.Collections;
    
    public class InsertApp {
    
        public static void main(String[] args) {
            InsertApp inApp = new InsertApp();
            inApp.append();
        }
    
        private void append() {
            SparkSession spark = SparkSession.builder()
                    .appName("InsertProcessor")
                    .master("local[*]").getOrCreate();
            Dataset<Row> df = spark.read()
                    .option("inferSchema", true)
                    .option("header", true)
                    .csv("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
    
            df.createOrReplaceTempView("order_view");
            try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
                Dataset<Row> newDf = spark.createDataFrame(
                        jsc.parallelize(
                                Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
                        ),
                        df.schema()
                );
                newDf.createOrReplaceTempView("new_order_view");
                Dataset<Row> df1 = spark.sql("SELECT * from order_view");
                System.out.println("Data From Order View:");
                df1.show();
                Dataset<Row> df2 = spark.sql("SELECT * from new_order_view");
                System.out.println("Data From New Order View:");
                df2.show();
                Dataset<Row> mergedDf = df1.union(df2).distinct();
                mergedDf.createOrReplaceTempView("order_view");
                Dataset<Row> df3 = spark.sql("SELECT * from order_view");
                System.out.println("Data From Order View After Merging:");
                df3.show();
                df3.write().mode(SaveMode.Overwrite)
                    .option("inferSchema", true)
                    .option("header", true).csv("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
            }
        }
    }
    

    Output :

    Data From Order View:
    +------------+--------+-----+--------+-------+----------+
    |   firstName|lastName|state|quantity|revenue| timestamp|
    +------------+--------+-----+--------+-------+----------+
    |Jean Georges|  Perrin|   NC|       1|    300|1551903533|
    |Jean Georges|  Perrin|   NC|       2|    120|1551903567|
    |Jean Georges|  Perrin|   CA|       4|     75|1551903599|
    |      Holden|   Karau|   CA|       6|     37|1551904299|
    |       Ginni| Rometty|   NY|       7|     91|1551916792|
    |      Holden|   Karau|   CA|       4|    153|1552876129|
    +------------+--------+-----+--------+-------+----------+
    
    Data From New Order View:
    +---------+--------+-----+--------+-------+----------+
    |firstName|lastName|state|quantity|revenue| timestamp|
    +---------+--------+-----+--------+-------+----------+
    |        V|       S|   TN|      10|     20|1551904299|
    +---------+--------+-----+--------+-------+----------+
    
    Data From Order View After Merging:
    +------------+--------+-----+--------+-------+----------+
    |   firstName|lastName|state|quantity|revenue| timestamp|
    +------------+--------+-----+--------+-------+----------+
    |Jean Georges|  Perrin|   CA|       4|     75|1551903599|
    |       Ginni| Rometty|   NY|       7|     91|1551916792|
    |Jean Georges|  Perrin|   NC|       1|    300|1551903533|
    |      Holden|   Karau|   CA|       4|    153|1552876129|
    |Jean Georges|  Perrin|   NC|       2|    120|1551903567|
    |      Holden|   Karau|   CA|       6|     37|1551904299|
    |           V|       S|   TN|      10|     20|1551904299|
    +------------+--------+-----+--------+-------+----------+
    

    Few points to mention:

    • Use csv method to load .csv files instead of load method.
    • I have used union method to combine both rows from two different table views and taken the distinct of it by using distinct method and then replaced the existing data of order_view by newly created merged data to get the job done.

    Hadoop UI Output (containing new merged data):

    enter image description here

    enter image description here

    I guess this will be an easier way to approach. See if this helps.