Search code examples
apache-spark

Data locality in Spark and hdfs


I want to understand the mechanism how spark chooses which spark nodes reads which files. For example if we have hdfs cluster and a spark cluster. We have a dataset written into hdfs in the csv form, which is partitioned. So we have a number of files representing different partitions and also each file is stored as a set of hdfs blocks, which as to my knowledge are usually 64MB. These blocks may be stored on different machines. The only way I understand it now as we would have one partition per hdfs-file and that spark cluster nodes will be assigned randomly to files and than they will use some hdfs-client to read that file. Is it correct or Spark somehow optimizes which spark cluster nodes reads which hdfs files.

I read docs and I also read spark whitepaper, but they do not describe this mechanism. Seems like docs usually say that Spark decouples compute and storage.


Solution

  • Non-Cloud / On-Premises Spark

    Keeping it simple and leaving out Dynamic Resource Allocation and recompute on Node failure.

    Back in the day this was aka traditional / classic Spark HDFS meaning:

    1. Spark is installed to work with a Resource Manager like, say, YARN, on a Cluster and
    2. When a Spark App starts it acquires a finite set of resources as in Executors and memory on the Worker Nodes via the Resource Manager without knowing what files will be read and
    3. When a Spark App actually runs the code, the code is brought to the data as per diagram below on that same Cluster - see https://spark.apache.org/docs/latest/cluster-overview.html. That is to say the data is on an HDFS Data Node and that is synonymous with a Worker Node from a Spark perspective. But you may not have gotten Executors on all Nodes. So, the Action that starts it all and reads from the disks will have Spark code shipped to those (Worker / Data) Nodes where data blocks exists for files, or not. You may have Executors on all Nodes or not. That is where data locality comes in; to execute some meaningful logic, you would process the data on the same Node if you have an Executor there, so as to avoid local network, rack latency etc. Otherwise, an Executor will need to read from with the rack or over the rack / node. Which is slower. HDFS's Name Node keeps track of where all data blocks, files are. Spark consults this and knows where to ship Tasks to ideally or where to ship Taks to and read from in a less optimized manner as per above.

    To answer your 1st question, Spark chooses nothing, but optimizes where it can based on Executor and Data Locality. Racks and Nodes have a bearing on which Executor to be used.

    enter image description here

    Shuffling between Nodes and spilling of data to disk will occur during execution, that is slightly off-topic here.

    As the program moves thru the code base, other Executors may need to be found for in-memory computations and what not. This may be on same or other Nodes.

    Spark in Cloud

    Your 2nd question. Seems like docs usually say that Spark decouples compute and storage. In the traditional on-premises, this is notso, as we bring the processing to the data as much as possible.

    In the Cloud scenario, compute and storage are divorced per de facto, so you have storage that is used for data at rest and dynamically allocated / elastic compute when needed; reserved intances are still divorced from storage. So, Spark uses Name Node to find out where to read from and data locality aspects differ from 'bringing the code to te data'.