Search code examples
azurepysparkdatabricksazure-synapseazure-log-analytics

Pyspark - Is it possible to write a pyspark dataframe to a custom log table in Log Analytics workspace


I have a pyspark dataframe that contains information about the tables that I have on sql database (creation date, number of rows, etc)

Sample data:

 {
   "Day":"2023-04-28",
   "Environment":"dev",
   "DatabaseName":"default",
   "TableName":"discount",
   "CountRows":31253
}

and I want to write this dataframe to a custom log table that I created on Log Analytics workspace, is it possible?

Thank you !


Solution

  • You use below code to write data into custom log table.

    import base64
    import hashlib
    import hmac
    import requests
    import datetime
    
    
    def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
        x_headers = 'x-ms-date:' + date
        string_to_hash = '\n'.join([method, str(content_length), content_type, x_headers, resource])
        bytes_to_hash = bytes(string_to_hash, encoding='utf-8')
        key_bytes = base64.b64decode(shared_key)
        decoded_hash = hmac.new(key_bytes, bytes_to_hash, digestmod=hashlib.sha256).digest()
        encoded_hash = base64.b64encode(decoded_hash).decode('utf-8')
        authorization = f'SharedKey {customer_id}:{encoded_hash}'
        return authorization
    
    def post_log_analytics_data(customer_id, shared_key, body, log_type):
        method = 'POST'
        content_type = 'application/json'
        resource = '/api/logs'
        rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
        content_length = len(body)
        
        signature = build_signature(
                        customer_id=customer_id,
                        shared_key=shared_key,
                        date=rfc1123date,
                        content_length=content_length,
                        method=method,
                        content_type=content_type,
                        resource=resource)
                        
        uri = f'https://{customer_id}.ods.opinsights.azure.com{resource}?api-version=2016-04-01'
        headers = {
        'Authorization': signature,
        'Log-Type': log_type,
        'x-ms-date': rfc1123date,
        "Content-Type": "application/json"
        }
        response = requests.post(uri, headers=headers, data=body)
        return response.status_code
    
    json_data = <your dataframe>.toJSON().collect()[0]
    status_code = post_log_analytics_data(<your workspace id>, <your primary key>, json_data, <log_type>)
    print(status_code)
    

    And the results are below,

    enter image description here

    enter image description here