Search code examples
airflowairflow-scheduler

Airflow Custom Metrics and/or Result Object with custom fields


While running pySpark SQL pipelines via Airflow I am interested in getting out some business stats like:

  • source read count
  • target write count
  • sizes of DFs during processing
  • error records count

One idea is to push it directly to the metrics, so it will gets automatically consumed by monitoring tools like Prometheus. Another idea is to obtain these values via some DAG result object, but I wasn't able to find anything about it in docs.

Please post some at least pseudo code if you have solution.


Solution

  • I would look to reuse Airflow's statistics and monitoring support in the airflow.stats.Stats class. Maybe something like this:

    import logging
    from airflow.stats import Stats
    
    PYSPARK_LOG_PREFIX = "airflow_pyspark"
    
    
    def your_python_operator(**context):
        [...]
    
        try:
            Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
            Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
            # So on and so forth
        except:
            logging.exception("Caught exception during statistics logging")
    
        [...]