We are using the command /home/ubuntu/spark/bin/spark-submit --master yarn --deploy-mode cluster --class "SimpleApp" /home/ubuntu/spark/examples/src/main/scala/sbt/target/scala-2.11/teste_2.11-1.0.jar
to run the script below
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark
import org.apache.spark.sql
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("query1").master("yarn").getOrCreate
val header = StructType(Array(
StructField("medallion", StringType, true),
StructField("hack_license", StringType, true),
StructField("vendor_id", StringType, true),
StructField("rate_code", IntegerType, true),
StructField("store_and_fwd_flag", StringType, true),
StructField("pickup_datetime", TimestampType, true),
StructField("dropoff_datetime", TimestampType, true),
StructField("passenger_count", IntegerType, true),
StructField("trip_time_in_secs", IntegerType, true),
StructField("trip_distance", FloatType, true),
StructField("pickup_longitude", FloatType, true),
StructField("pickup_latitude", FloatType, true),
StructField("dropoff_longitude", FloatType, true),
StructField("dropoff_latitude", FloatType, true),
StructField("payment_type", StringType, true),
StructField("fare_amount", FloatType, true),
StructField("surcharge", FloatType, true),
StructField("mta_tax", FloatType, true),
StructField("trip_amount", FloatType, true),
StructField("tolls_amount", FloatType, true),
StructField("total_amount", FloatType, true),
StructField("zone", StringType, true)))
val nyct = spark.read.format("csv").option("delimiter", ",").option("header", "true").schema(header).load("/home/ubuntu/trip_data/trip_data_fare_1.csv")
nyct.createOrReplaceTempView("nyct_temp_table")
spark.time(spark.sql("""SELECT zone, COUNT(*) AS accesses FROM nyct_temp_table WHERE (HOUR(dropoff_datetime) >= 8 AND HOUR(dropoff_datetime) <= 19) GROUP BY zone ORDER BY accesses DESC""").show())
}
}
The idea is run the query in the script into a cluster with spark and Hadoop. But in the end of the execution this generate a error to reed a csv file from path /home/ubuntu/trip_data/trip_data_fare_1.csv
. This is the picture of the error
I think the problem is that the node slave can't find the file in the master dir. Someone know how can i fix this and run this script in cluster?
Since you are running in cluster, you should have this file in hdfs. You can copy the files from local filesystem to HDFS using the following command:
hadoop fs -put source_path dest_path
Then use the dest_path in your code.
For you, do this on the host that has the local file:
hadoop fs -put /home/ubuntu/trip_data/trip_data_fare_1.csv <some_hdfs_location>
Verify that the copy has worked by doing the following:
hdfs dfs -ls <some_hdfs_location>