Search code examples
opc-uawatchdog

opcua-asyncio solution to error in watchdog loop


I have the problem that the connection between my client and an OPC UA server is interrupted every few days. The server publishes new values every two seconds, which the client receives via a pub sub and stores in a JSON.

For the client I use a Raspberry Pi 4 with Python and opcua-asyncio. Server and client are in the same network. The server runs via KEPServerEX6.

That the client loses the connection for a few minutes is actually not a big problem. The only problem is that it stops running completely after the error and I have to restart it manually.

I have already tried more or less desperately in the SubHandler to restart the program in case of an error message. But of course it is not nice and does not always work.

Here is some part of the code (or most of it).

class SubHandler:
    # Handle data that is received for the subscriptions
    def __init__(self, crude, acid, base, water, waste, waste_mass, centri_1_IN, centri_2_IN, centri_1_OUT, centri_2_OUT):
        self.crude = crude
        self.acid = acid

    def datachange_notification(self, node: Node, val, data):
        if node == self.crude:
            write_file("crude_flow", val)
        elif node == self.acid:
            write_file("acid_flow", val)
            
    
    def event_notification(self, event: ua.EventNotificationList):
        _logger.warning("event_notification %s", status)
        pass
    
    def status_change_notification(self, status: ua.StatusChangeNotification):
        _logger.warning("status_change_notification %s", status)
        if status != 0:
            _logger.warning("Some bad status change, restarting program")
            python = sys.executable
            os.execl(python, python, *sys.argv)
            
        pass
...

async def main():
    while True:
        client = Client(url=url_scada)
        try:
            async with client:
                _logger.warning("Connected")

                crude = client.get_node(nodeID_crude)
                acid = client.get_node(nodeID_acid)

                nodeIDs = [crude, acid]

                handler = SubHandler(crude, acid)
                sub = await client.create_subscription(500, handler)

                # Filter erstellen
                filter = ua.DataChangeFilter(Trigger=ua.DataChangeTrigger.StatusValueTimestamp)

                # MonitoredItems mit Filter hinzufügen
                for node_id in nodeIDs:
                    handle = await sub._subscribe(
                        node_id,
                        ua.AttributeIds.Value,
                        mfilter=filter
                    )

                while True:
                    await asyncio.sleep(0.5)
                    await client.check_connection()
        except(ConnectionError, ua.Ua.Error):
            _logger.warning("Reconnecting in 1 second")
            await asyncio.sleep(1)
...

And here is some part of the error. During the error handling other errors occur and ultimately the program stops.

ERROR:asyncua.client.client:Error in watchdog loop
Traceback (most recent call last):
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/client.py", line 457, in _monitor_server_loop
    _ = await self.nodes.server_state.read_value()
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 179, in read_value
    result = await self.read_data_value()
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 190, in read_data_value
    return await self.read_attribute(ua.AttributeIds.Value, None, raise_on_bad_status)
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 304, in read_attribute
    result = await self.session.read(params)
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/ua_client.py", line 397, in read
    data = await self.protocol.send_request(request)
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/ua_client.py", line 160, in send_request
    data = await asyncio.wait_for(self._send_request(request, timeout, message_type), timeout if timeout else None)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 423, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
INFO:asyncua.common.subscription:Publish callback called with result: PublishResult(SubscriptionId=9, AvailableSequenceNumbers=[], MoreNotifications=True, NotificationMessage_=NotificationMessage(SequenceNumber=0, PublishTime=datetime.datetime(2023, 5, 30, 17, 39, 59, 457322), NotificationData=[StatusChangeNotification(Status=2148270080, DiagnosticInfo_=DiagnosticInfo(SymbolicId=None, NamespaceURI=None, Locale=None, LocalizedText=None, AdditionalInfo=None, InnerStatusCode=None, InnerDiagnosticInfo=None))]), Results=[], DiagnosticInfos=[])
WARNING:__main__:status_change_notification 2148270080
WARNING:__main__:Some bad status change, restarting program
ERROR:asyncua.common.subscription:Exception calling status change handler

I tried to find a solution in the opcua-asyncio library to handle the error. But could not really find anything. Would be really nice if anyone could help.


Solution

  • The watchdog is a background task, to keep the connection alive. So if it has a error, the connection is broken. Without the watchdog you wouldn't notice a broken connection, in your code.

    Maybe the example in asyncua is not complete because it misses a exception, try this exception handler:

    except(concurrent.futures.TimeoutError, ConnectionError, ua.Ua.Error):
        _logger.warning("Reconnecting in 1 second")
        await asyncio.sleep(1)