I am using the kubernetes version 1.25 client and server, I have deployed Airflow using the official helm charts on the environment. I want the Airflow dags kubernetes pod operator that has code to trigger the spark-submit operation to spawn the driver pod and an executor pod that will run inside the spark submit command and perform a task. The Dag performs the following task 1. Take a table from mysql, 2.dump it in a text file, 3. put the same file to a minio bucket(similar to aws S3) Currently the driver pod spawns with executor pod. The Driver pod then fails eventually as it does not come into a running state. This event causes the executor pod to fail as well. I am authenticating the call going to kubernetes api using the a Service Account that I am passing as a configuration.
This is my redacted dag that I am using, note that spark-submit command works perfectly fine inside the container of the image on the command line and generates a expected outcome, So I doubt its some dag configuration that I might be missing here. Also not that all the jars that I am referring here are already part of the image and are being referenced from the**/opt/spark/connectors/** I have verified this by doing exec inside the container image
import logging
import csv
import airflow
from airflow import DAG
from airflow.utils import dates as date
from datetime import timedelta, datetime
from airflow.providers.apache.spark.operators.spark_jdbc import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from dateutil.tz import tzlocal
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
import pendulum
#from airflow.models import Variables
local_tz = pendulum.timezone("Asia/Dubai")
volume_config = {"persistentVolumeClaim": {"claimName": "nfspvc-airflow-executable"}}
air_connectors_volume_config = {"persistentVolumeClaim": {"claimName": "nfspvc-airconnectors"}}
volume_mount = VolumeMount(
"data-volume",
mount_path="/air-spark/",
sub_path=None,
read_only=False,
)
air_connectors_volume_mount = VolumeMount(
"air-connectors",
mount_path="/air-connectors/",
sub_path=None,
read_only=False,
)
volume = Volume(
name="data-volume",
configs=volume_config
)
air_connectors_volume = Volume(
name="air-connectors",
configs=air_connectors_volume_config
)
default_args = {
'owner': 'panoiqtest',
'depends_on_past': False,
'start_date': datetime(2021, 5, 1, tzinfo=local_tz),
'retries': 1,
'retry_delay': timedelta(hours=1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
}
dag_daily = DAG(dag_id='operator',
default_args=default_args,
catchup=False,
schedule_interval='0 */1 * * *')
_config = {
'application': '/opt/spark/connectors/spark-etl-assembly-2.0.jar',
'num_executors': 2,
'driver_memory': '5G',
'executor_memory': '10G',
'driver_class_path':'/opt/spark/connectors/mysql-connector-java-5.1.49.jar',
'jars':'/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar',
#'java_class': 'com.spark.ETLHandler'
}
spark_config = {
"spark.executor.extraClassPath":"/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar",
"spark.driver.extraClassPath":"/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar"
}
t2 = BashOperator(
task_id='bash_example',
# "scripts" folder is under "/usr/local/airflow/dags"
bash_command="ls /air-spark/ && pwd",
dag=dag_daily)
def get_tables(table_file='/csv-directory/success-dag.csv', **kwargs):
logging.info("#Starting get_tables()#")
tables_list=[]
with open(table_file) as csvfile:
reader = csv.reader(csvfile, delimiter=',')
tables_list= [row for row in reader]
tables_list.pop(0) #remove header
return tables_list
def load_table(table_name, application_args, **kwargs):
k8s_arguments = [
'--name=datalake-pod',
'--master=k8s://https://IP:6443',
'--deploy-mode=cluster',
# '--driver-cores=4',
# '--executor-cores=4',
# '--num-executors=1',
# '--driver-memory=8192m',
'--executor-memory=8192m',
'--conf=spark.kubernetes.authenticate.driver.serviceAccountName=air-airflow-sa',
'--driver-class-path=/opt/spark/connectors//mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar',
'--conf=spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp',
'--jars=/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/aws-java-sdk-bundle-1.12.374.jar,/opt/spark/connectors/hadoop-aws-3.3.1.jar',
'--conf=spark.kubernetes.namespace=development',
# '--conf=spark.driver.cores=4',
# '--conf=spark.executor.cores=4',
# '--conf=spark.driver.memory=8192m',
# '--conf=spark.executor.memory=8192m',
'--conf=spark.kubernetes.container.image=image_name',
'--conf=spark.kubernetes.container.image.pullSecrets=Secret_name',
'--conf=spark.kubernetes.container.image.pullPolicy=Always',
'--conf=spark.dynamicAllocation.enabled=true',
'--conf=spark.dynamicAllocation.shuffleTracking.enabled=true',
'--conf=spark.kubernetes.driver.volumes.persistentVolumeClaim.air-connectors.mount.path=/air-connectors/',
'--conf=spark.kubernetes.driver.volumes.persistentVolumeClaim.air-connectors.mount.readOnly=false',
'--conf=spark.kubernetes.driver.volumes.persistentVolumeClaim.air-connectors.options.claimName=nfspvc-airconnectors',
'--conf=spark.kubernetes.file.upload.path=/opt/spark',
'--class=com.spark.ETLHandler',
'/opt/spark/connectors/spark-etl-assembly-2.0.jar'
];
all_arguments = k8s_arguments + application_args
return KubernetesPodOperator(
dag=dag_daily,
name="zombie-dry-run", #spark_submit_for_"+table_name
# image='image_name',
image='imagerepo.io:5050/panoiq/tools:sparktester',
image_pull_policy = 'Always',
image_pull_secrets = 'registry',
namespace='development',
cmds=['spark-submit'],
arguments=all_arguments,
labels={"foo": "bar"},
task_id="dry_run_demo", #spark_submit_for_"+table_name
# config_file="conf",
volumes=[volume, air_connectors_volume],
volume_mounts=[volume_mount, air_connectors_volume_mount],
)
push_tables_list = PythonOperator(task_id= "load_tables_list",
python_callable=get_tables,
dag=dag_daily)
complete = DummyOperator(task_id="complete",
dag=dag_daily)
for rec in get_tables():
table_name = rec[9]
s3_folder_name = rec[14]
s3_object_name = rec[13]
jdbcUrl = rec[4] + rec[8]
lineagegraph = ",".join(rec[17].split("#"))
entitlement = rec[10]
remarks = rec[11]
username = rec[5]
password = rec[6]
s3_folder_format = rec[16]
select_query = rec[9]
application_args= [select_query, s3_folder_name, jdbcUrl, lineagegraph,entitlement, remarks,username,password,s3_folder_format,s3_object_name]
push_tables_list >> load_table(table_name, application_args) >> complete
Any Help or pointers are appreciated on the issue!! Thanks in advance!!
I was able to fix this issue with the code below, I was able to use the Airflow pod itself as driver and that will just spawn a executor pod and run the jobs and die once completed the job flow
Below is my Python File for anyone that needs to do this again
import logging
import csv
import airflow
from airflow import DAG
from airflow.utils import dates as date
from datetime import timedelta, datetime
from airflow.providers.apache.spark.operators.spark_jdbc import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
#from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from dateutil.tz import tzlocal
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
import pendulum
#from airflow.models import Variables
local_tz = pendulum.timezone("Asia/Dubai")
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2021, 5, 1, tzinfo=local_tz),
'retries': 1,
'retry_delay': timedelta(hours=1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
}
dag_daily = DAG(dag_id='datapipeline',
default_args=default_args,
catchup=False,
schedule_interval='@hourly')
start = DummyOperator(task_id='run_this_first', dag=dag_daily)
_config = {
'application': '/air-spark/spark-etl-assembly-2.0.jar',
'num_executors': 2,
'driver_memory': '5G',
'executor_memory': '10G',
'driver_class_path':'/air-connectors/mysql-connector-java-5.1.49.jar',
'jars':'/air-connectors/mysql-connector-java-5.1.49.jar,/air-connectors/aws-java-sdk-bundle-1.12.374.jar,/air-connectors/hadoop-aws-3.3.1.jar',
#'java_class': 'com.spark.ETLHandler'
}
spark_config = {
"spark.executor.extraClassPath":"/air-connectors/mysql-connector-java-5.1.49.jar,/air-connectors/aws-java-sdk-bundle-1.12.374.jar,/air-connectors/hadoop-aws-3.3.1.jar",
"spark.driver.extraClassPath":"/air-connectors/mysql-connector-java-5.1.49.jar,/air-connectors/aws-java-sdk-bundle-1.12.374.jar,/air-connectors/hadoop-aws-3.3.1.jar"
}
t2 = BashOperator(
task_id='bash_example',
# "scripts" folder is under "/usr/local/airflow/dags"
bash_command="ls /air-spark/ && pwd",
dag=dag_daily)
def get_tables(table_file='/csv-directory/success-dag.csv', **kwargs):
logging.info("#Starting get_tables()#")
tables_list=[]
with open(table_file) as csvfile:
reader = csv.reader(csvfile, delimiter=',')
tables_list= [row for row in reader]
tables_list.pop(0) #remove header
return tables_list
def load_table(table_name, application_args, **kwargs):
k8s_arguments = [ "--master", "local[*]", "--conf", "spark.executor.extraClassPath=/air-connectors/mysql-connector-java-5.1.49.jar",
"--conf", "spark.driver.extraClassPath=/opt/spark/connectors/mysql-connector-java-5.1.49.jar", "--jars",
"/opt/spark/connectors/mysql-connector-java-5.1.49.jar,/opt/spark/connectors/ojdbc11-21.7.0.0.jar",
"--conf=spark.kubernetes.container.image=imagerepo.io:5050/tools:sparktesterV0.6",
"--conf=spark.kubernetes.container.image.pullSecrets=registry",
"--num-executors", "5", "--executor-memory", "1G", "--driver-memory", "2G", "--class=com.spark.ETLHandler",
"--name", "arrow-spark", "/opt/spark/connectors/spark-etl-assembly-2.0.jar" ];
all_arguments = k8s_arguments + application_args
# spark =
return KubernetesPodOperator(
image="imagerepo.io:5050/tools:sparktesterV0.6",
service_account_name="air-airflow-worker",
name="data_pipeline_k8s",
task_id="data_pipeline_k8s",
get_logs=True,
dag=dag_daily,
namespace="development",
image_pull_secrets="registry",
image_pull_policy="Always",
cmds=["spark-submit"],
arguments=all_arguments
)
# spark.set_upstream(start)
push_tables_list = PythonOperator(task_id= "load_tables_list",python_callable=get_tables,dag=dag_daily)
complete = DummyOperator(task_id="complete",dag=dag_daily)
for rec in get_tables():
table_name = rec[9]
s3_folder_name = rec[14]
s3_object_name = rec[13]
jdbcUrl = rec[4] + rec[8]
lineagegraph = ",".join(rec[17].split("#"))
entitlement = rec[10]
remarks = rec[11]
username = rec[5]
password = rec[6]
s3_folder_format = rec[16]
select_query = rec[9]
application_args= [select_query, s3_folder_name, jdbcUrl, lineagegraph,entitlement, remarks,username,password,s3_folder_format,s3_object_name]
push_tables_list >> load_table(table_name, application_args) >> complete