Search code examples
apache-sparkkubernetesairflowspark-submitkubernetespodoperator

I am relatively new to Airflow and spark and want to use the Kubernetes Operator in airflow Dag to run a Spark Submit command


I am using the kubernetes version 1.25 client and server, I have deployed Airflow using the official helm charts on the environment. I want the Airflow dags kubernetes pod operator that has code to trigger the spark-submit operation to spawn the driver pod and an executor pod that will run inside the spark submit command and perform a task. The Dag performs the following task 1. Take a table from mysql, 2.dump it in a text file, 3. put the same file to a minio bucket(similar to aws S3) Currently the driver pod spawns with executor pod. The Driver pod then fails eventually as it does not come into a running state. This event causes the executor pod to fail as well. I am authenticating the call going to kubernetes api using the a Service Account that I am passing as a configuration.

This is my redacted dag that I am using, note that spark-submit command works perfectly fine inside the container of the image on the command line and generates a expected outcome, So I doubt its some dag configuration that I might be missing here. Also not that all the jars that I am referring here are already part of the image and are being referenced from the**/opt/spark/connectors/** I have verified this by doing exec inside the container image

import logging
import csv
import airflow
from airflow import DAG
from airflow.utils import dates as date
from datetime import timedelta, datetime
from airflow.providers.apache.spark.operators.spark_jdbc import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from dateutil.tz import tzlocal
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
import pendulum
#from airflow.models import Variables
local_tz = pendulum.timezone("Asia/Dubai")


volume_config = {"persistentVolumeClaim": {"claimName": "nfspvc-airflow-executable"}}
air_connectors_volume_config = {"persistentVolumeClaim": {"claimName": "nfspvc-airconnectors"}}

volume_mount = VolumeMount(
    "data-volume",
    mount_path="/air-spark/",
    sub_path=None,
    read_only=False,
)


air_connectors_volume_mount = VolumeMount(
    "air-connectors",
    mount_path="/air-connectors/",
    sub_path=None,
    read_only=False,
)

volume = Volume(
    name="data-volume",
    configs=volume_config
)

air_connectors_volume = Volume(
    name="air-connectors",
    configs=air_connectors_volume_config
)

default_args = {
    'owner': 'panoiqtest',
    'depends_on_past': False,
    'start_date': datetime(2021, 5, 1, tzinfo=local_tz),
    'retries': 1,
    'retry_delay': timedelta(hours=1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False
}
dag_daily  = DAG(dag_id='operator',
             default_args=default_args,
             catchup=False,
             schedule_interval='0 */1 * * *')
_config = {
        'application': '/opt/spark/connectors/spark-etl-assembly-2.0.jar',
        'num_executors': 2,
        'driver_memory': '5G',
        'executor_memory': '10G',
       'driver_class_path':'/opt/spark/connectors/mysql-connector-java-5.1.49.jar',
        'jars':'/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar',
       #'java_class': 'com.spark.ETLHandler'
    }
spark_config = {
        "spark.executor.extraClassPath":"/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar",
        "spark.driver.extraClassPath":"/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar"
    }

t2 = BashOperator(
    task_id='bash_example',
    # "scripts" folder is under "/usr/local/airflow/dags"
    bash_command="ls /air-spark/ && pwd",
    dag=dag_daily)

def get_tables(table_file='/csv-directory/success-dag.csv', **kwargs):
    logging.info("#Starting get_tables()#")
    tables_list=[]
    with open(table_file) as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        tables_list= [row for row in reader]
        tables_list.pop(0) #remove header
    return tables_list
def load_table(table_name, application_args, **kwargs):
    k8s_arguments = [
      '--name=datalake-pod',
      '--master=k8s://https://IP:6443',
      '--deploy-mode=cluster',
#      '--driver-cores=4',
#      '--executor-cores=4',
#      '--num-executors=1',
#      '--driver-memory=8192m',
      '--executor-memory=8192m',
      '--conf=spark.kubernetes.authenticate.driver.serviceAccountName=air-airflow-sa',
      '--driver-class-path=/opt/spark/connectors//mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar',
      '--conf=spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp',
      '--jars=/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar', 
      '--conf=spark.kubernetes.namespace=development', 
#      '--conf=spark.driver.cores=4',
#      '--conf=spark.executor.cores=4',
#      '--conf=spark.driver.memory=8192m',
#      '--conf=spark.executor.memory=8192m',
      '--conf=spark.kubernetes.container.image=image_name',
      '--conf=spark.kubernetes.container.image.pullSecrets=Secret_name',
      '--conf=spark.kubernetes.container.image.pullPolicy=Always',
      '--conf=spark.dynamicAllocation.enabled=true',
      '--conf=spark.dynamicAllocation.shuffleTracking.enabled=true',
      '--conf=spark.kubernetes.driver.volumes.persistentVolumeClaim.air-connectors.mount.path=/air-connectors/',
      '--conf=spark.kubernetes.driver.volumes.persistentVolumeClaim.air-connectors.mount.readOnly=false',
      '--conf=spark.kubernetes.driver.volumes.persistentVolumeClaim.air-connectors.options.claimName=nfspvc-airconnectors',
      '--conf=spark.kubernetes.file.upload.path=/opt/spark',
      '--class=com.spark.ETLHandler',
      '/opt/spark/connectors/spark-etl-assembly-2.0.jar'
   ];

    all_arguments = k8s_arguments + application_args

    return KubernetesPodOperator(
        dag=dag_daily,
        name="zombie-dry-run", #spark_submit_for_"+table_name
#        image='image_name',
        image='imagerepo.io:5050/panoiq/tools:sparktester',
        image_pull_policy = 'Always',
        image_pull_secrets = 'registry',
        namespace='development',
        cmds=['spark-submit'],
        arguments=all_arguments,
        labels={"foo": "bar"},
        task_id="dry_run_demo", #spark_submit_for_"+table_name
#        config_file="conf",
        volumes=[volume, air_connectors_volume],
        volume_mounts=[volume_mount, air_connectors_volume_mount],
    )    


push_tables_list = PythonOperator(task_id= "load_tables_list",
                                 python_callable=get_tables,
                                 dag=dag_daily)
complete = DummyOperator(task_id="complete",
                        dag=dag_daily)
for rec in get_tables():
    table_name = rec[9]
    s3_folder_name = rec[14]
    s3_object_name = rec[13]
    jdbcUrl = rec[4] + rec[8]
    lineagegraph = ",".join(rec[17].split("#"))
    entitlement = rec[10]
    remarks = rec[11]
    username = rec[5]
    password = rec[6]
    s3_folder_format = rec[16]
    select_query = rec[9]
    application_args= [select_query, s3_folder_name, jdbcUrl, lineagegraph,entitlement, remarks,username,password,s3_folder_format,s3_object_name]
    push_tables_list >> load_table(table_name, application_args) >> complete

Any Help or pointers are appreciated on the issue!! Thanks in advance!!


Solution

  • I was able to fix this issue with the code below, I was able to use the Airflow pod itself as driver and that will just spawn a executor pod and run the jobs and die once completed the job flow

    Below is my Python File for anyone that needs to do this again

    import logging
    import csv
    import airflow
    from airflow import DAG
    from airflow.utils import dates as date
    from datetime import timedelta, datetime
    from airflow.providers.apache.spark.operators.spark_jdbc import SparkSubmitOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    #from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
    from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
    from dateutil.tz import tzlocal
    from airflow.kubernetes.volume import Volume
    from airflow.kubernetes.volume_mount import VolumeMount
    import pendulum
    #from airflow.models import Variables
    local_tz = pendulum.timezone("Asia/Dubai")
    
    
    
    default_args = {
        'owner': 'test',
        'depends_on_past': False,
        'start_date': datetime(2021, 5, 1, tzinfo=local_tz),
        'retries': 1,
        'retry_delay': timedelta(hours=1),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False
    }
    
    dag_daily  = DAG(dag_id='datapipeline',
                 default_args=default_args,
                 catchup=False,
                 schedule_interval='@hourly')
    
    start = DummyOperator(task_id='run_this_first', dag=dag_daily)
    
    
    _config = {
            'application': '/air-spark/spark-etl-assembly-2.0.jar',
            'num_executors': 2,
            'driver_memory': '5G',
            'executor_memory': '10G',
           'driver_class_path':'/air-connectors/mysql-connector-java-5.1.49.jar',
            'jars':'/air-connectors/mysql-connector-java-5.1.49.jar,/air-connectors/aws-java-sdk-bundle-1.12.374.jar,/air-connectors/hadoop-aws-3.3.1.jar',
           #'java_class': 'com.spark.ETLHandler'
        }
    
    spark_config = {
            "spark.executor.extraClassPath":"/air-connectors/mysql-connector-java-5.1.49.jar,/air-connectors/aws-java-sdk-bundle-1.12.374.jar,/air-connectors/hadoop-aws-3.3.1.jar",
            "spark.driver.extraClassPath":"/air-connectors/mysql-connector-java-5.1.49.jar,/air-connectors/aws-java-sdk-bundle-1.12.374.jar,/air-connectors/hadoop-aws-3.3.1.jar"
        }
    
    t2 = BashOperator(
        task_id='bash_example',
        # "scripts" folder is under "/usr/local/airflow/dags"
        bash_command="ls /air-spark/ && pwd",
        dag=dag_daily)
    
    def get_tables(table_file='/csv-directory/success-dag.csv', **kwargs):
        logging.info("#Starting get_tables()#")
        tables_list=[]
        with open(table_file) as csvfile:
            reader = csv.reader(csvfile, delimiter=',')
            tables_list= [row for row in reader]
            tables_list.pop(0) #remove header
        return tables_list
    def load_table(table_name, application_args, **kwargs):
        k8s_arguments = [ "--master", "local[*]", "--conf", "spark.executor.extraClassPath=/air-connectors/mysql-connector-java-5.1.49.jar",
                            "--conf", "spark.driver.extraClassPath=/opt/spark/connectors/mysql-connector-java-5.1.49.jar", "--jars",
                            "/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/ojdbc11-21.7.0.0.jar",
                            "--conf=spark.kubernetes.container.image=imagerepo.io:5050/tools:sparktesterV0.6",
                            "--conf=spark.kubernetes.container.image.pullSecrets=registry",
                            "--num-executors", "5", "--executor-memory", "1G", "--driver-memory", "2G", "--class=com.spark.ETLHandler",
                            "--name", "arrow-spark", "/opt/spark/connectors/spark-etl-assembly-2.0.jar" ];
    
        all_arguments = k8s_arguments + application_args
    
    
    
    #    spark = 
        return KubernetesPodOperator(
                image="imagerepo.io:5050/tools:sparktesterV0.6",
                service_account_name="air-airflow-worker",
                name="data_pipeline_k8s",
                task_id="data_pipeline_k8s",
                get_logs=True,
                dag=dag_daily,
                namespace="development",
                image_pull_secrets="registry",
                image_pull_policy="Always",
                cmds=["spark-submit"],
                arguments=all_arguments
                )
    
    
    #    spark.set_upstream(start)
    
    
    
    
    push_tables_list = PythonOperator(task_id= "load_tables_list",python_callable=get_tables,dag=dag_daily)
    complete = DummyOperator(task_id="complete",dag=dag_daily)
    for rec in get_tables():
        table_name = rec[9]
        s3_folder_name = rec[14]
        s3_object_name = rec[13]
        jdbcUrl = rec[4] + rec[8]
        lineagegraph = ",".join(rec[17].split("#"))
        entitlement = rec[10]
        remarks = rec[11]
        username = rec[5]
        password = rec[6]
        s3_folder_format = rec[16]
        select_query = rec[9]
        application_args= [select_query, s3_folder_name, jdbcUrl, lineagegraph,entitlement, remarks,username,password,s3_folder_format,s3_object_name]
        push_tables_list >> load_table(table_name, application_args) >> complete