Search code examples
apache-sparkhadooppysparkapache-spark-sqlhadoop2

Problem with data locality when running Spark query with local nature on apache Hadoop


I have a Hadoop cluster that uses Apache Spark to query parquet files saved on Hadoop. For example, when I'm using the following PySpark code to find a word in parquet files:

df = spark.read.parquet("hdfs://test/parquets/*")
df.filter(df['word'] == "jhon").show()

After running this code, I go to spark application UI, stages tab. I see that locality level summery set on Any. In contrast, because of this query's nature, it must run locally and on NODE_LOCAL locality level at least. When I check the network IO of the cluster while running this, I find out that this query use network (network IO increases while the query is running). The strange part of this situation is that the number shown in the spark UI's shuffle section is minimal.

With Russell Spitzer's help in the Apache Spark mailing list to determine the root cause of this problem, I ran the following codes to find each partition's preferred location. The result of this code makes me one step closer to solve this problem. I found out that preferred locations are in IP form and neither Hostname, but spark use executors' IP to match the preferred location and achieve data locality.

scala> def getRootRdd( rdd:RDD[_] ): RDD[_]  = { if(rdd.dependencies.size == 0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]

scala> val rdd = spark.read.parquet("hdfs://test/parquets/*").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at <console>:24

scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at <console>:24

scala> scan.partitions.map(scan.preferredLocations)
res12: Array[Seq[String]] = Array(WrappedArray(datanode-1, datanode-2, datanode-3), WrappedArray(datanode-2, datanode-3, datanode-4), WrappedArray(datanode-3, datanode-4, datanode-5),...

Now I try to find ways to make the Spark first resolve the hostname then match them with the executor's IPs. Is there any suggestion?


Solution

  • This problem was created because Spark's preferred locations from Hadoop for partitions are datanode hostname, but Spark workers registered to Spark master by IP. Spark is trying to sketch the task to run on the executor with the local partition. Because executors are mapped to IPs and partitions to hostname, the scheduler can't match IPs with hostnames, and tasks always run on "Any" locality level. To solve this problem, we must run spark-workers with -h [hostname] flag. As a result, workers registered in master by the hostname instead of IP, and solve the problem.