Search code examples
pythonapache-sparkkubernetespysparkairflow

Passing argument on SparkKubernetesOperator


I am using a spark with airflow, but not able to pass the arguments. I have tried multiple ways please suggest where right way to do this this.

dag.py file:

base_operator = SparkKubernetesOperator(
                application_file="spark-pi.yaml",
                task_id='segment_tag_refresh_process',
                namespace="spark-jobs", 
                api_group="sparkoperator.k8s.io",
                api_version="v1beta2",
                parms= {"ID": '1'},
                dag=dag
        )

spark-pi.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
   name: spark-create-file
spec:
  type: Scala
  mode: cluster
  image: imagefilename
  imagePullSecrets:
    - sparkairlow
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.
  mainApplicationFile: local:///data/processing.py
  arguments: {{ parms.ID}}
  sparkVersion: 3.5.0
  sparkConf:
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: /data/logs
  ....
  other configurations
  ....

While reading the arguments on processing.py I am using system arguments to read:

import sys
print("**********",sys.argv)

But not able to find the arguments.

If any think I'm missing please ask I'll update.


Solution

  • I solved this issue using:

    dag.py

    dag = DAG('spark_application', default_args=default_args, schedule_interval=None)
    
    arguments_to_pass = {
        'id': '1'
    }
    
    spark_operator = SparkKubernetesOperator(
        task_id='spark_submit_task',
        namespace='your_namespace',  # Update with your Kubernetes namespace
        application_file="path/to/your/spark-application.yaml",
        kubernetes_conn_id='your_kubernetes_connection_id',
        params=arguments_to_pass,
        dag=dag,
    )
    

    spark-pi.yaml

    # Your SparkApplication YAML file
    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-create-file
    spec:
      # other configurations...
      arguments: ["--id={{ params.id }}"]
    
    

    reading the arguments on processing.py

    import sys
    print("**********",sys.argv)
    

    Now it's working for me.