Search code examples
pythonjsonazure-functionspostmanazure-eventhub

How to convert from an avro file to a json file, which was originally sent as raw json via Postman through Azure Event Hub?


So the issue is that I initially send this as raw data via Postman:

Raw data as JSON send via Postman:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

Once this is caught by Event Hub Capture it converts to an Avro file. I am trying to retrieve the data by using fastavro and converting it to a JSON format. The problem is that I am not getting back the same raw data that was initially sent by Postman. I can't find a way to convert it back to its original state, why does Avro also send me additional information from Postman? I probably need to find a way to set the "Body" to only convert. But for some reason, it also adds "bytes" inside the body I am just trying to get my original raw data back that was sent via Postman.

init.py (Azure function)

    import logging
    import os
    import string
    import json
    import uuid
    import avro.schema
    import tempfile
    import azure.functions as func
    from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    from fastavro import reader, json_writer
    
    
    #Because the Apache Python avro package is written in pure Python, it is relatively slow, therefoer I make use of fastavro
    def avroToJson(avroFile):
        with open("json_file.json", "w") as json_file:
            with open(avroFile, "rb") as avro_file:
                avro_reader = reader(avro_file)
                json_writer(json_file, avro_reader.writer_schema, avro_reader)
    
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
      logging.info('Python HTTP trigger function processed a request.')
      print('Processor started using path ' + os.getcwd())
      connect_str = "###########"
      container = ContainerClient.from_connection_string(connect_str, container_name="####")
      blob_list = container.list_blobs() # List the blobs in the container.
      for blob in blob_list:
          # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).
          if blob.size > 508:
              print('Downloaded a non empty blob: ' + blob.name)
              # Create a blob client for the blob.
              blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
              # Construct a file name based on the blob name.
              cleanName = str.replace(blob.name, '/', '_')
              cleanName = os.getcwd() + '\\' + cleanName
              # Download file
              with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                  my_file.write(blob_client.download_blob().readall())# Write blob contents into the file.
                  
              avroToJson(cleanName)
              with open('json_file.json','r') as file:
                   jsonStr = file.read()
            
      return func.HttpResponse(jsonStr, status_code=200)

Expected result:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

Actual result:

{
   "SequenceNumber":19,
   "Offset":"10928",
   "EnqueuedTimeUtc":"4/1/2021 8:43:19 AM",
   "SystemProperties":{
      "x-opt-enqueued-time":{
         "long":1617266599145
      }
   },
   "Properties":{
      "Postman-Token":{
         "string":"37ff4cc6-9124-45e5-ba9d-######e"
      }
   },
   "Body":{
      "bytes":"{\r\n  \"id\": 1,\r\n  \"receiver\": \"2222222222222\",\r\n  \"message\": {\r\n    \"Name\": \"testing\",\r\n    \"PersonId\": 2,\r\n    \"CarId\": 2,\r\n    \"GUID\": \"1s3q1d-s546dq1-8e22e\",\r\n    \"LineId\": 2,\r\n    \"SvcId\": 2,\r\n    \"Lat\": -64.546547,\r\n    \"Lon\": -64.546547,\r\n    \"TimeStamp\": \"2021-03-18T08:29:36.758Z\",\r\n    \"Recorder\": \"dq65ds4qdezzer\",\r\n    \"Env\": \"DEV\"\r\n  },\r\n  \"operator\": 20404,\r\n  \"sender\": \"MSISDN\",\r\n  \"binary\": 1,\r\n  \"sent\": \"2021-03-29T08:29:36.758Z\"\r\n}"
   }
}

This question was originally posted under this Alternative to Azure Event Hub Capture for sending Event Hub messages to Blob Storage? thread because another question emerged from the initial issue.

In case this is not the way to proceed on StackOverflow, please feel free to comment on how I should handle this next time. Kind regards.


Solution

  • Try Returning body:

    return func.HttpResponse(json.loads(jsonStr)['body']['bytes'], status_code=200)