Search code examples
pythonpython-asynciopython-multithreadingopc-ua

Python asyncua in thread get any value would delay


I want use asyncua to subscribe some variables. I use thread because I need dynamically update subscription list every once in a while. But when I received datachange notification and try to read node browse name,it will delay about 2 second. How do I fix it? Here is my code.

import asyncio

from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
import pymssql
from threading import Thread

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


class MyHandler(SubHandler):
    n = 1
    def __init__(self, variables_len=0):
        self._queue = asyncio.Queue()
        self.variables_len = variables_len
        conn = pymssql.connect(server='(local)', user='sa',
                               password='sa', database='AdventureWorks2005')
        self.cursor = conn.cursor()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        if self.n > self.variables_len:
            self._queue.put_nowait([node, value, data])
            print(f'Data change notification was received and queued.')
            self.n = self.variables_len+1
        self.n += 1

    async def process(self) -> None:
        try:
            while True:
                # Get data in a queue.
                [node, value, data] = self._queue.get_nowait()

                # *** Write your processing code ***
                print(await node.read_browse_name())   #delay here
                self.cursor.execute('select TOP(1) * from tool_list')  #test code
                row = self.cursor.fetchall()
                print(row)

        except asyncio.QueueEmpty:
            pass


async def main() -> None:
    async with Client(url=ENDPOINT) as client:
        # Get a variable node.
        idx = await client.get_namespace_index(NAMESPACE)
        obj = await client.get_objects_node().get_child([f'{idx}:MyObject'])
        variables = await obj.get_variables()
        # Subscribe data change.
        handler = MyHandler(variables_len=len(variables))
        subscription = await client.create_subscription(period=0, handler=handler)
        handle = await subscription.subscribe_data_change(variables)


        # Process data change every 100ms
        async def insert_to_sql():
            while True:
                await handler.process()
                #service_level = await client.get_node('i=2267').get_value()
                await asyncio.sleep(0.1)

        def between_callback():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop.run_until_complete(some_callback())
            loop.close()

        _thread = Thread(target=between_callback)
        _thread.daemon = True
        _thread.start()

        while True:
            # Get a variable node.
            idx = await client.get_namespace_index(NAMESPACE)
            obj = await client.get_objects_node().get_child([f'{idx}:MyObject'])
            variables_new = await obj.get_variables()
            if variables != variables_new:
                handler.variables_len = len(variables_new)
                handler.n = 1
                await subscription.unsubscribe(handle)
                handle = await subscription.subscribe_data_change(variables_new)
                variables = variables_new
                print('update done')
                await asyncio.sleep(100)



if __name__ == '__main__':
    asyncio.run(main())

I tried changing **print(await node.read_browse_name()) ** to datachange_notification function but it got the same result


Solution

  • Run your own thread is tricky. I would try to use instead of a own thread to_thread