Search code examples
pythonpython-asyncioopc-ua

opcua-asyncio : Subscriptions to multiples addresses


I'm making monitoring client using opcua-asyncio. The goal is to keep track of the values of a list of nodes for a list of machines. But there's no examples on how to manages multiples connections in the documentation and I'm a bit lost.

The subscription works fine for one machine :

import sys
sys.path.insert(0, "..")
import time
import asyncio
from asyncua import Client, ua, Node

url = 'opc.tcp://192.168.200.204:8999'

class SubscriptionHandler:
    def datachange_notification(self, node: Node, val, data):
        print(str(val))
        
async def main():
    client = Client(url=url)
    async with client:
        handler = SubscriptionHandler()
        subscription = await client.create_subscription(500, handler)
        nodes = [
            client.get_node("ns=2;s=CncData/MachineStatus/Status"),
            client.get_node("ns=2;s=CncData/MachineStatus/Part/PieceToDo"),
        ]
        await subscription.subscribe_data_change(nodes)
        while True:
            await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())

The result is the subscriptions being created, then the two values returned, so everything's good.

Result

The thing is, I can't simply add a loop that will iterate a list of adresses at the start of the main, because the subscriptions is pooled with a "while True". This is what I tried :

ENDPOINT = [list_of_addresses]

class SubscriptionHandler:
    def datachange_notification(self, node: Node, val, data):
        print(str(val))

clients = []
handles = []
async def main():
    try:
        for add in ENDPOINT:
            cli = Client(url=add)
            clients.append(cli)
        for client in clients:
            async with client:
                var = client.get_node("ns=2;s=CncData/MachineStatus/Status")
                handler = SubscriptionHandler()
                # We create a Client Subscription.
                subscription = await client.create_subscription(500, handler)
                nodes = [
                    var,
                    client.get_node("ns=2;s=CncData/MachineStatus/Part/PieceToDo"),
                ]
                # We subscribe to data changes for two nodes (variables).
                handles.append(await subscription.subscribe_data_change(nodes))
        while True:
            await asyncio.sleep(1000)
    except (ConnectionError, ua.UaError):
        print("CONNEXION ERROR")
        await asyncio.sleep(2)

if __name__ == "__main__":
    asyncio.run(main())

The subscriptions are indeed created, but the subscription's handlers are never called :

Results

I don't understand where to put the "while true" that is essential for pooling.


Solution

  • You can make a function for the connection. And make a asyncio task for each client.

    async def connection_loop(add):
        try:
            client = Client(url=add)
            async with client:
                var = client.get_node("ns=2;s=CncData/MachineStatus/Status")
                handler = SubscriptionHandler()
                # We create a Client Subscription.
                subscription = await client.create_subscription(500, handler)
                nodes = [
                    var,
                    client.get_node("ns=2;s=CncData/MachineStatus/Part/PieceToDo"),
                ]
                # We subscribe to data changes for two nodes (variables).
                handles.append(await subscription.subscribe_data_change(nodes))
                while 1:
                    await asyncio.sleep(2)
                    await client.check_connection()  # Throws a exception if connection is lost
        except (ConnectionError, ua.UaError):
            print("CONNEXION ERROR")
            await asyncio.sleep(2)
    
    async def main():
        tasks = [asyncio.create_task(connection_loop(add)) for add in ENDPOINT]
        await asyncio.gather(*tasks)