Search code examples
scalaapache-sparkworkflow

Workflow and Scheduling Framework for Spark with Scala in Maven Done with Intellij IDEA


I have created a spark project with Scala. Its a maven project with all dependency configured in POM.

Spark i am using as ETL. Source is file generated by API, All kind of transformation in spark then load it to cassandra.

Is there any Workflow software, which can used the jar to automate the process with email triggering, success or failure job flow.

May someone please help me..... is Airflow can be used for this purpose, i have used SCALA and NOT Python

Kindly share your thoughts.


Solution

  • There is no built-in mechanism in Spark that will help. A cron job seems reasonable for your case. If you find yourself continuously adding dependencies to the scheduled job, try Azkaban

    one such example of shell script is :-

    #!/bin/bash
    cd /locm/spark_jobs
    
    export SPARK_HOME=/usr/hdp/2.2.0.0-2041/spark
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export HADOOP_USER_NAME=hdfs
    export HADOOP_GROUP=hdfs
    
    #export SPARK_CLASSPATH=$SPARK_CLASSPATH:/locm/spark_jobs/configs/*
    
    CLASS=$1
    MASTER=$2
    ARGS=$3
    CLASS_ARGS=$4
    echo "Running $CLASS With Master: $MASTER With Args: $ARGS And Class Args: $CLASS_ARGS"
    
    $SPARK_HOME/bin/spark-submit --class $CLASS --master $MASTER --num-executors 4 --executor-cores 4 "application jar file"
    

    You can even try using spark-launcher which can be used to start a spark application programmatically :-

    First create a sample spark application and build a jar file for it.

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object SparkApp extends App{
    val conf=new SparkConf().setMaster("local[*]").setAppName("spark-app")
    val sc=new SparkContext(conf)
    val rdd=sc.parallelize(Array(2,3,2,1))
    rdd.saveAsTextFile("result")
    sc.stop()
    }
    

    This is our simple spark application, make a jar of this application using sbt assembly, now we make a scala application through which we start this spark application as follows:

    import org.apache.spark.launcher.SparkLauncher
    
    object Launcher extends App {
    
    val spark = new SparkLauncher()
    .setSparkHome("/home/knoldus/spark-1.4.0-bin-hadoop2.6")
    .setAppResource("/home/knoldus/spark_launcher-assembly-1.0.jar")
    .setMainClass("SparkApp")
    .setMaster("local[*]")
    .launch();
    spark.waitFor();
    
    }
    

    In the above code we use SparkLauncher object and set values for its like

    setSparkHome(“/home/knoldus/spark-1.4.0-bin-hadoop2.6”) is use to set spark home which is use internally to call spark submit.

    .setAppResource(“/home/knoldus/spark_launcher-assembly-1.0.jar”) is use to specify jar of our spark application.

    .setMainClass(“SparkApp”) the entry point of the spark program i.e driver program.

    .setMaster(“local[*]”) set the address of master where its start here now we run it on loacal machine.

    .launch() is simply start our spark application.

    Its a minimal requirement you can also set many other configurations like pass arguments, add jar , set configurations etc.