Search code examples
apache-sparkpysparknetworkingdistributed-computing

In Apache Spark, is there a way to force DataFrame execution on specific nodes?


First, let me describe my setup. I have two PCs connected via Ethernet. PC A performs both Master and Worker node functions, while PC B solely operates as a Worker node. Due to certain limitations, I can't use distributed storage systems like HDFS, HBase, or databases. Therefore, I need to directly create DataFrames from a local folder (/root/data), where both A and B have.

A has files from 1.txt to 2000.txt in /root/data, and B has files from 2001.txt to 4000.txt in /root/data. My task involves counting the character occurrences in each text file by using Pandas_udf. The resulting DataFrame should consist of 4000 rows, with the character count as a column value. Importantly, files 1 to 2000.txt must be processed on A, while files 2001 to 4000.txt on B. below is my code.

spark = SparkSession.builder.config(conf=conf).appName("wordcount").getOrCreate()

file_paths = [f"/root/data/{i}.txt" for i in range(1, 4001)]

data = [(path,) for path in file_paths]
df = spark.createDataFrame(data, ["path"])

df = df.withColumn("count", my_udf_wordCount(df['path']))

save_path = "/root/data/result"
df.write.format('com.databricks.spark.csv') \
        .mode('overwrite') \
        .option("header", "true") \
        .save(save_path)

@pandas_udf(IntegerType())
def my_udf_wordCount(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for path_series in iterator:
        for path in path_series:
            file_path = path.strip()
            with open(file_path, 'r') as file:
                text = file.read()
                words = text.split()
                word_count = len(words)
                yield pd.Series(word_count)

Of course, if i submit the code like that, it will cause errors because the processing for 1 to 4000.txt gets mixed into nodes A and B. I will encounter errors as the 4000 requests get mixed between A and B, (e.g., one of /root/data/2001~4000.txt paths being sent to node A or 1~2001.txt path sent to B). enter image description here

What I want is for tasks to be distributed to nodes based on the 'path' column, which corresponds to the file in each of the 4000 DataFrames. Tasks with '/root/data/2.txt' should be distributed to Node A, while tasks with '/root/data/3000.txt' should be sent to Node B. How can the code be modified to achieve this?

Additionally, the tasks from 1 to 4000 should all be performed from a single DataFrame, and the tasks must be submitted as a cluster with both Node A and Node B. There's no separate option to submit jobs for Node A or Node B. The part about fault tolerance can be disregarded for now.

How to distribute tasks to nodes where files exist at the paths corresponding to the 'Path' in the DataFrame?


Solution

  • Try to create the RDD by passing (file_name, node_host) tuples to the Scala's SparkContext.makeRDD() function and process the files using the RDD.mapPartitions() function.

    Update: I have not been able to make sc._jsc.makeRDD() work in PySpark: it fails with "Py4JException: Method makeRDD([class java.util.ArrayList]) does not exist" error. Please try making it work or creating ParallelCollectionRDD another way yourself. If this fails, I'd suggest to take the same approach with Scala API if you can; it's the main language in Spark and has the most complete API.

    # processes a single file, returns an iterator with a single (path, count) tuple.
    def word_count(iterator):
        path = next(iterator)
        with open(path) as f:
           ...
    
    # creates a partition for each row/path and assignes a preferred location to it. 
    count_by_path = sc._jsc.makeRDD([("/root/data/1.txt", ["host_1"]), ... , ("/root/data/4000.txt", ["host_2"])])
            .mapPartitions(word_count)
    
    spark.createDataFrame(count_by_path, ['path', 'count'])
        .write
        . ...
    

    Please see resources below for more information.