Search code examples
apache-sparkhadoopcluster-computinghadoop-yarnspark-submit

error: path does not exist in spark submit with hadoop


We are using the command /home/ubuntu/spark/bin/spark-submit --master yarn --deploy-mode cluster --class "SimpleApp" /home/ubuntu/spark/examples/src/main/scala/sbt/target/scala-2.11/teste_2.11-1.0.jar to run the script below

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark
import org.apache.spark.sql
import org.apache.spark.SparkContext._


object SimpleApp {
     def main(args: Array[String]) {

     val spark = SparkSession.builder().appName("query1").master("yarn").getOrCreate
     val header = StructType(Array(
             StructField("medallion", StringType, true),
             StructField("hack_license", StringType, true),
             StructField("vendor_id", StringType, true),
             StructField("rate_code", IntegerType, true),
             StructField("store_and_fwd_flag", StringType, true),
             StructField("pickup_datetime", TimestampType, true),
             StructField("dropoff_datetime", TimestampType, true),
             StructField("passenger_count", IntegerType, true),
             StructField("trip_time_in_secs", IntegerType, true),
             StructField("trip_distance", FloatType, true),
             StructField("pickup_longitude", FloatType, true),
             StructField("pickup_latitude", FloatType, true),
             StructField("dropoff_longitude", FloatType, true),
             StructField("dropoff_latitude", FloatType, true),
             StructField("payment_type", StringType, true),
             StructField("fare_amount", FloatType, true),
             StructField("surcharge", FloatType, true),
             StructField("mta_tax", FloatType, true),
             StructField("trip_amount", FloatType, true),
             StructField("tolls_amount", FloatType, true),
             StructField("total_amount", FloatType, true),
             StructField("zone", StringType, true)))

     val nyct = spark.read.format("csv").option("delimiter", ",").option("header", "true").schema(header).load("/home/ubuntu/trip_data/trip_data_fare_1.csv")
     nyct.createOrReplaceTempView("nyct_temp_table")

     spark.time(spark.sql("""SELECT zone, COUNT(*) AS accesses FROM nyct_temp_table WHERE (HOUR(dropoff_datetime) >= 8 AND HOUR(dropoff_datetime) <= 19) GROUP BY zone ORDER BY accesses DESC""").show())

     }
 }

The idea is run the query in the script into a cluster with spark and Hadoop. But in the end of the execution this generate a error to reed a csv file from path /home/ubuntu/trip_data/trip_data_fare_1.csv. This is the picture of the error

I think the problem is that the node slave can't find the file in the master dir. Someone know how can i fix this and run this script in cluster?


Solution

  • Since you are running in cluster, you should have this file in hdfs. You can copy the files from local filesystem to HDFS using the following command:

    hadoop fs -put source_path dest_path
    

    Then use the dest_path in your code.

    For you, do this on the host that has the local file:

    hadoop fs -put /home/ubuntu/trip_data/trip_data_fare_1.csv <some_hdfs_location>
    

    Verify that the copy has worked by doing the following:

    hdfs dfs -ls <some_hdfs_location>