Search code examples
pythonazure-eventhub

Managing event hubs in Azure from python


I'm trying to write python code to be able to create / delete event hubs on Azure Event Hub from python scripts. I have managed to create an EventHubManagementClient following the documentation on this page. I believe I now need to use the EventHubsOperations Class as documented here.

I have 2 challenges:

  1. what would be 'aaaa' in the "from aaaa import EventHubsOperations" line so as to be able to refer to the class? I can't seem to find how to call the respective package to import the class...
  2. What values to pass for config, serializer and deserializer that all are required values when using the class? Perhaps someone can share an example of how to use this class?

Ideally I'm looking to call the create_or_delete method to create a new event hub or to delete an existing event hub from the python script. If someone can share how this code should be extended to achieve this, I'd appreciate it. The documentation seems increadibly light: "config, Required, Configuration of service client"...

My code is as follows:

import setenv
import os
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential

setenv.import_env_vars('')

vault_url = os.environ["KEY_VAULT_URL"]
subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"]

credential = DefaultAzureCredential()

print('Creating EH_client...')
EH_client = EventHubManagementClient(vault_url, credential, subscription_id, base_url=None)
print('Created.')

EventHubsOperations(EH_client)

The resulting output is as follows:

Project root: 
filename: env_values
Creating EH_client...
Created.
Traceback (most recent call last):
  File "/home/db533/gitRepos/GunaBot2/azure-mgmt/azure_test.py", line 25, in <module>
    EventHubsOperations(EH_client)
NameError: name 'EventHubsOperations' is not defined

Process finished with exit code 1

Solution

  • Here's my code that worked for creating and deleting eventhubs from python.

    I use a separate script (setenv.py) to load environment variables that are stored in a text file.

    import os
    import setenv
    from azure.mgmt.eventhub import EventHubManagementClient
    from azure.mgmt.resource import ResourceManagementClient
    from azure.common.credentials import ServicePrincipalCredentials
    from azure.mgmt.storage import StorageManagementClient
    from azure.mgmt.storage.models import (StorageAccountCreateParameters,Sku,SkuName,Kind)
    
    set_env_path="C:\\Users\\db533\\PycharmProjects\\GunaBot2\\shared_files\\"
    setenv.import_env_vars(set_env_path,'env_values')
    
    def main():
        SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
        GROUP_NAME = "annabot-eventhub2"
        STORAGE_ACCOUNT_NAME = "storageaccountxyztest"
        NAMESPACE_NAME = "annabot-eventhub999"
        EVENTHUB_NAME = "worker99901"
    
        tenant_id = os.environ["AZURE_TENANT_ID"]
        client_id = os.environ["AZURE_CLIENT_ID"]
        client_secret = os.environ["AZURE_CLIENT_SECRET"]
        print('AZURE_CLIENT_SECRET:',client_secret)
    
        credential_common = ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=client_secret)
    
        # Create client
        print(" Create resource client...")
        resource_client = ResourceManagementClient(credential_common, SUBSCRIPTION_ID)
    
        print(" Create Event hub client...")
        eventhub_client = EventHubManagementClient(credential_common,SUBSCRIPTION_ID)
    
        print(" Create storage client...")
        storage_client = StorageManagementClient(credential_common,SUBSCRIPTION_ID)
    
        # Create resource group
        print(" Create resource group...")
        resource_client.resource_groups.create_or_update(
            GROUP_NAME,
            {"location": "germanywestcentral"}
        )
    
        # Create StorageAccount
        print(" Create storageAccount...")
        storage_async_operation = storage_client.storage_accounts.create(
            GROUP_NAME,
            STORAGE_ACCOUNT_NAME,
            StorageAccountCreateParameters(
                sku=Sku(name=SkuName.standard_lrs),
                kind=Kind.storage_v2,
                location='germanywestcentral'
            )
        )
        storage_account = storage_async_operation.result()
    
        # Create Namespace
        print(" Create event hub namespace...")
        eventhub_client.namespaces.create_or_update(
            GROUP_NAME,
            NAMESPACE_NAME,
            {
              "sku": {
                "name": "Standard",
                "tier": "Standard"
              },
              "location": "Germany West Central",
              "tags": {
                "tag1": "value1",
                "tag2": "value2"
              },
              "kafka_enabled": "True"
            }
        ).result()
    
        # Create EventHub
        print(" Create event hub...")
        eventhub = eventhub_client.event_hubs.create_or_update(
            GROUP_NAME,
            NAMESPACE_NAME,
            EVENTHUB_NAME,
            {
              "message_retention_in_days": "4",
              "partition_count": "4",
              "status": "Active",
              "capture_description": {
                "enabled": True,
                "encoding": "Avro",
                "interval_in_seconds": "120",
                "size_limit_in_bytes": "10485763",
                "destination": {
                  "name": "EventHubArchive.AzureBlockBlob",
                  "storage_account_resource_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + GROUP_NAME + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "",
                  "blob_container": "container",
                  "archive_name_format": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
                }
              }
            }
        )
        print("Created EventHub: {}".format(eventhub))
    
        # Get EventHub
        eventhub = eventhub_client.event_hubs.get(
            GROUP_NAME,
            NAMESPACE_NAME,
            EVENTHUB_NAME
        )
        print("get() for EventHub: {}\n".format(eventhub))
    
        #Create authorisation rule
        eventhub_rule = eventhub_client.event_hubs.create_or_update_authorization_rule(
            GROUP_NAME,
            NAMESPACE_NAME,
            event_hub_name=EVENTHUB_NAME,
            authorization_rule_name="manager",
            rights=["LISTEN","SEND"]
        )
        print("create_or_update_authorization_rule() for Manager for EventHub: {}\n".format(eventhub_rule))
    
        # Get authorisation rule
        eventhub_rule2 = eventhub_client.event_hubs.get_authorization_rule(
            GROUP_NAME,
            NAMESPACE_NAME,
            event_hub_name=EVENTHUB_NAME,
            authorization_rule_name="manager"
        )
        print("get_authorization_rule() for manager for EventHub: {}\n".format(eventhub_rule2))
    
        # List keys
        namespace_keys = eventhub_client.event_hubs.list_keys(
            GROUP_NAME,
            NAMESPACE_NAME,
            event_hub_name=EVENTHUB_NAME,
            authorization_rule_name="manager"
        )
        print("list_keys() for EventHub: {}\n".format(namespace_keys))
        print("namespace_keys.primary_connection_string:",namespace_keys.primary_connection_string)
    
    # Delete EventHub
        eventhub_client.event_hubs.delete(
            GROUP_NAME,
            NAMESPACE_NAME,
            EVENTHUB_NAME
        )
        print("Delete EventHub.")
    
        # Delete Namespace
        eventhub_client.namespaces.delete(
            GROUP_NAME,
            NAMESPACE_NAME
        ).result()
    
        # Delete StorageAccount
        storage_client.storage_accounts.delete(
            GROUP_NAME,
            STORAGE_ACCOUNT_NAME
        )
    
        # Delete resource group
        resource_client.resource_groups.delete(
            GROUP_NAME
        ).result()
    
    
    if __name__ == "__main__":
        main()
    

    The setenv.py script to load the environment variables is as follows. (I got this from another answer. Can't take credit for this...):

    import os
    
    def import_env_vars(env_folder,env_filename):
        """Imports some environment variables from a special .env file in the
        project root directory.
        """
        print("env_folder:",env_folder)
        if len(env_folder) > 0 and env_folder[-1] != '\\':
            env_folder += '\\'
        try:
            print("filename:",env_folder+env_filename)
            envfile = open(env_folder+env_filename, "r")
    
        except IOError:
            raise Exception("You must have a {0} file in your project root "
                            "in order to run the server in your local machine. "
                            "This specifies some necessary environment variables. ")
        for line in envfile.readlines():
            [key,value] = line.strip().split("=")
            os.environ[key] = value
            print("key:",key)
            print("value:", value)
    

    Environment variables are defined in the file as follows:

    EVENTHUB_SERVER=gunabot-eventhub.servicebus.windows.net
    DEV_STAGE=Dev
    AZURE_SUBSCRIPTION_ID=xxxxxxxxx-xxxx-xxxxxxx-xxxxx-xxxx
    AZURE_TENANT_ID=yyyyyyyyy-yyyyy-yyyyyy-yyyyyy
    AZURE_CLIENT_ID=zzzzzz-zzzzzz-zzzzzz-zzzzzzz-zzz
    AZURE_CLIENT_SECRET=qqqqq-qqqq-qqqqqqq-qqqqq-qqqqq
    

    Hope this helps others.