Search code examples
pysparkdatabricksazure-application-insightsopen-telemetryazure-monitoring

How do you write Data from a Spark Data Frame to Azure Application Insights from data bricks using python?


I have a spark data frame that has some reconciliation data and this data frame only gets created if reconciliation fails.

When this spark data frame does get created in databricks, I need to send the data in it to azure application insights from databricks.

try:
    # Check count of reconciliation Data Frame and send to Application Insights if count > 0, meaning there's one or more reconciliation error(s).
    if reconciliation_DF.count() > 0:
       reconciliation_DF_json = reconciliation_DF.toJSON().collect()   # Convert Data Frame to JSON
       reconciliation_DF_json = json.dumps(reconciliation_DF_json)    # Convert list of JSONs to string for logging

       # Some code here to write this "reconciliation_DF_json" data to Azure Application Insights???
       # This is where I am stuck.

    else:
         print("No reconciliation errors found")

except Exception as e:
    print("An error occurred: {}".format(e))
    raise

There seems to be a library, “Open Telemetry”, but the Microsoft documentation isn’t clear on how to use that for my scenario.

A python code example of how to do this with a data frame named "reconciliation_DF" would be much appreciated.


Solution

  • I work on Azure Monitor's Python OpenTelemetry solutions. Let me know if this helps. Using the azure-monitor-opentelemetry library, you can instrument common libraries and python logs via a single configure_azure_monitor() call. You can then use the native python logging library to sent traces to Application Insights:

    configure_azure_monitor(connection_string=CONN_STR)
    logger.setLevel(logging.INFO)
    try:
        # Check count of reconciliation Data Frame and send to Application Insights if count > 0, meaning there's one or more reconciliation error(s).
        if reconciliation_DF.count() > 0:
           reconciliation_DF_json = reconciliation_DF.toJSON().collect()   # Convert Data Frame to JSON
           reconciliation_DF_json = json.dumps(reconciliation_DF_json)    # Convert list of JSONs to string for logging
    
           logger.info(reconciliation_DF_json)
    
        else:
             print("No reconciliation errors found")
    
    except Exception as e:
        print("An error occurred: {}".format(e))
        raise
    

    Note that you can also set the connection string with an environment variable. There may be a few ways to do what you are trying to. Please, let me know if this fits your use case.