I am using a spark with airflow, but not able to pass the arguments. I have tried multiple ways please suggest where right way to do this this.
dag.py file:
base_operator = SparkKubernetesOperator(
application_file="spark-pi.yaml",
task_id='segment_tag_refresh_process',
namespace="spark-jobs",
api_group="sparkoperator.k8s.io",
api_version="v1beta2",
parms= {"ID": '1'},
dag=dag
)
spark-pi.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-create-file
spec:
type: Scala
mode: cluster
image: imagefilename
imagePullSecrets:
- sparkairlow
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.
mainApplicationFile: local:///data/processing.py
arguments: {{ parms.ID}}
sparkVersion: 3.5.0
sparkConf:
spark.eventLog.enabled: "true"
spark.eventLog.dir: /data/logs
....
other configurations
....
While reading the arguments on processing.py I am using system arguments to read:
import sys
print("**********",sys.argv)
But not able to find the arguments.
If any think I'm missing please ask I'll update.
I solved this issue using:
dag.py
dag = DAG('spark_application', default_args=default_args, schedule_interval=None)
arguments_to_pass = {
'id': '1'
}
spark_operator = SparkKubernetesOperator(
task_id='spark_submit_task',
namespace='your_namespace', # Update with your Kubernetes namespace
application_file="path/to/your/spark-application.yaml",
kubernetes_conn_id='your_kubernetes_connection_id',
params=arguments_to_pass,
dag=dag,
)
spark-pi.yaml
# Your SparkApplication YAML file
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-create-file
spec:
# other configurations...
arguments: ["--id={{ params.id }}"]
reading the arguments on processing.py
import sys
print("**********",sys.argv)
Now it's working for me.