Search code examples
apache-sparksparkcore

Where App, used Spark, execute not-spark-context code


For example, I have the following code:

public static void main(String[] args) {
    RestController restController = new RestController();
    SparkSession sparkSession = SparkSession
            .builder()
            .appName("test example")
            .getOrCreate();

    Dataset<Row> csvFileDF = sparkSession.read().csv("test_csv");
    
    // code in task //
    restController.sendFile();
    // __________//
    
    csvFileDF.write().parquet("test_parquet");
}

Method restController.sendFile() executed not in spark context, as opposed to read csv and write parquet operations.

Jar runned by:

spark-submit --jar main.jar

Do I understand correctly that restController.sendFile() execuded on Driver?


Solution

  • In general in Spark, the calculations that take place on your executors are the actions/transformations that you perform on distributed data (RDDs, DataFrames, Datasets). The rest takes place in the driver, because the calculations are not distributed.

    So in your case, it does indeed seem like restController.sendFile() only takes place on the driver but I can't say for sure because I don't know what that method does.

    Let's make a very simple example:

    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    
    spark = SparkSession.builder.getOrCreate()
    
    myList = [
        (1,),
        (2,),
        (3,),
        (4,),
        (5,),
        (6,),
        (7,),
        (8,),
        (9,),
        (10,),
    ]
    df = spark.createDataFrame(
        myList,
        ["myInt"],
    )
    
    df2 = df.withColumn("increment", F.col("myInt") + 1)
    df2.write.csv("myTestFile.csv")
    
    myList2 = [(x[0], x[0] + 1) for x in myList]
    

    In here, you see that we:

    • create a df2 dataframe by incrementing the first column by 1
    • create a myList2 list by doing the same thing

    When looking at the spark history server for that application, we see:

    enter image description here

    Only the dataframe operation happened in our Spark context. The rest happened on the driver as a normal, non-distributed calculation.