I am beginner in Spark and I am trying to understand its architecture. I understand that in ideal case, cluster manager assigns tasks to executors which are running on same node where data required for processing is also present. But what if a node where data is present does not have any available executors?
The broad idea behind how spark is working with co-located data is this:
If you use the SparkSession provided tools to read a DataFrame (see DataFrameReader Documentation) then an execution graph is created which will try to read data node-local. I.e. each Spark executor will read data which resides on the local-to-this executor part of a distributed storage: For example local HDFS-blocks. This requires that you have partitioning information on the data store, and use this to create a DataFrameReader. This is the proper way to use Spark with big data, since it allows near-arbitrary scaling.
By Rick Moritza from here
There are use-cases where the data is not co-located with the spark cluster and spark has to figure out what is the best method to bring the data to the executors. Another use-case is exactly what you said, one node doesn't have enough resources to start the executor. For all of these use-cases spark handle the issues following some rules:
Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:
- PROCESS_LOCAL - data is in the same JVM as the running code. This is the best locality possible
- NODE_LOCAL - data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
- NO_PREF - data is accessed equally quickly from anywhere and has no locality preference
- RACK_LOCAL - data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
- ANY - data is elsewhere on the network and not in the same rack
Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same server, or b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see thespark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.
And preferably a difference of local data sorting: PROCESS_LOCAL> NODE_LOCAL> NO_PREF> RACK_LOCAL
From here
If data is not co-located with your spark application, as an example you read data from S3:
The main problem with S3 is that the consumers no longer have data locality and all reads need to transfer data across the network, and S3 performance tuning itself is a black box.When using HDFS and getting perfect data locality, it is possible to get ~3GB/node local read throughput on some of the instance types (e.g. i2.8xl, roughly 90MB/s per core). DBIO, our cloud I/O optimization module, provides optimized connectors to S3 and can sustain ~600MB/s read throughput on i2.8xl (roughly 20MB/s per core).
That is to say, on a per node basis, HDFS can yield 6X higher read throughput than S3. Thus, given that the S3 is 10x cheaper than HDFS, we find that S3 is almost 2x better compared to HDFS on performance per dollar.
However, a big benefit with S3 is we can separate storage from compute, and as a result, we can just launch a larger cluster for a smaller period of time to increase throughput, up to allowable physical limits. This separation of compute and storage also allow for different Spark applications (such as a data engineering ETL job and an ad-hoc data science model training cluster) to run on their own clusters, preventing concurrency issues that affect multi-user fixed-sized Hadoop clusters. This separation (and the flexible accommodation of disparate workloads) not only lowers cost but also improves the user experience.
From here