Search code examples
python-3.xazureservicebus

How do I create an Azure Service Bus subscription filter using Python?


I am able to list the filters for a specified topic subscription. And for each filter to see the rules. Snippet:

from azure.servicebus.management import ServiceBusAdministrationClient, CorrelationRuleFilter servicebus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(CONNECTION_STR) for rule_properties in servicebus_mgmt_client.list_rules(TOPIC_NAME, SUBSCRIPTION_NAME): print(rule_properties)

This returns something like: {'name': 'customer', 'filter': <azure.servicebus.management._models.CorrelationRuleFilter object at 0x104877760>, 'action': None, 'created_at_utc': datetime.datetime(2023, 7, 17, 12, 59, 8, 164754, tzinfo=<isodate.tzinfo.Utc object at 0x1047d2b80>)}

Questions: 1- How do I see the specifics of the filter? I want to know its type ("CorrelationFilter"), and its Custom Properties (key/value).

2- How do I create a rule? Something like:

servicebus_mgmt_client.create_rule(TOPIC_NAME, SUBSCRIPTION_NAME, myfilter)

But what should be the syntax of myfilter? I suspect it has something to with the CorrelationRuleFilter.

Nothing in the Microsoft GitHub ServiceBusAdministrationClient documentation about creating or deleting filters in a subscription. Various Google answers, either in .NET or outdated.

Otherwise, reading messages, listing subscriptions, checking the DLQ, all works.


Solution

  • With the sample code below able to create a Rule 'new-rule' with filter 'CorrelationRuleFilter' created.

    from azure.servicebus.management import (
        ServiceBusAdministrationClient,
        CorrelationRuleFilter,
    )
    
    CONNECTION_STR = "Endpoint=sb://hedwig-servicebus-prod.servicebus.windows.net/;"
    TOPIC_NAME = "rules-output"
    SUBSCRIPTION_NAME = "notificationengine-ts"
    
    
    def view_filter_details(filter):
        print(f"Filter Type: {type(filter).__name__}")
        if isinstance(filter, CorrelationRuleFilter):
            print("Custom Properties:")
            for key, value in filter.properties.items():
                print(f"{key}: {value}")
    
    
    def main():
        servicebus_mgmt_client = ServiceBusAdministrationClient.from_connection_string(
            CONNECTION_STR
        )
        for rule_properties in servicebus_mgmt_client.list_rules(
            TOPIC_NAME, SUBSCRIPTION_NAME
        ):
            print(f"Rule Name: {rule_properties['name']}")
            view_filter_details(rule_properties["filter"])
    
        new_filter = CorrelationRuleFilter(correlation_id="my-correlation-id")
        new_rule_name = "new-rule"
        servicebus_mgmt_client.create_rule(
            TOPIC_NAME, SUBSCRIPTION_NAME, new_rule_name, filter=new_filter
        )
        print(f"Rule '{new_rule_name}' with filter '{type(new_filter).__name__}' created.")
    
    
    if __name__ == "__main__":
        main()
    
    

    Output:

    enter image description here