Search code examples
pythonasynchronouspython-asynciosubscriptionopc-ua

save to csv simultaneously opcua datachange notification


I subscribe to several nodes at the same time, so that I only receive a notification when the values change. I need to store the data in a csv file and I write rows with the timestamp + all values of the nodes. I implemented asyncio.Queue to handle the simultaneous writing, but still if I get more than one notification at the same time, the file gets updated with the first notification but not the others which get lost.

Here is the output:

sv_writer_task started
Queue list: 
<Queue maxsize=0>

Datachange notification received.
29032024 14:10:16Data change notification was received and queued:  ns=2;s=Unit_SB.UC3.TEMP.TEMP_v 19.731399536132812
Datachange notification received.
29032024 14:10:16Data change notification was received and queued:  ns=2;s=Unit_SB.UC3.HUMI.HUMI_v 36.523399353027344

Dequeue value:  19.731399536132812
val name:  TEMP
Prefix UC1 is no match for node: Unit_SB.UC3.TEMP.TEMP_v. Skipping.
Prefix UC2 is no match for node: Unit_SB.UC3.TEMP.TEMP_v. Skipping.
csv file:  UC3.csv
Appending new row
Data written to file UC3.csv succesfully

Dequeue value:  36.523399353027344
val name:  HUMI
Prefix UC1 is no match for node: Unit_SB.UC3.HUMI.HUMI_v. Skipping.
Prefix UC2 is no match for node: Unit_SB.UC3.HUMI.HUMI_v. Skipping.
csv file:  UC3.csv
Updating existing row
Data written to file UC3.csv succesfully

In the csv file one can see in last line that only the TEMP value, which arrived first, was changed, but not the HUMI one:

Timestamp,OUT_G,OUT_U,HUMI,TEMP
29032024 14:08:42,True,True,47.38769912719727,15.043899536132812
29032024 14:10:16,True,True,47.38769912719727,19.731399536132812

here the code so far. Since I'm subscribing to several subsystem and each of them has it's own data file, I'm checking to which file to write:

# this is the consumer which consumes items from the queue
async def csv_writer_task(queue):
    print("csv_writer_task started")
    print('Queue list: ')
    print(queue)
    print('')
    while True:
        try:
            dte, node, val = await queue.get()
            print('Dequeue value: ', val)
        except Exception as e:
            print(f"Error in csv_writer_task while retrieving value fromt he queue: {e}")

        node_id_str = str(node.nodeid.Identifier)
        node_parts = node_id_str[len("Unit_SB."):].split('.')
        val_name = node_parts[-1].replace('_v', '')
        print('val name: ', val_name)
        for key, header_row in prefix_mapping.items():
            if f"Unit_SB.{key}" in node_id_str:
                csv_file = f"{key}.csv"
                print('csv file: ', csv_file)
                break
            else:
                print(f"Prefix {key} is no match for node: {node_id_str}. Skipping.")
        df = pd.read_csv(csv_file)
        last_row = df.iloc[-1].copy()
        if last_row['Timestamp'] == dte:
            print("Updating existing row")
            last_row[val_name] = val
        else:
            print("Appending new row")
            new_row = last_row.copy()
            new_row['Timestamp'] = dte
            new_row[val_name] = val
            df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
        df.to_csv(csv_file, index=False)
        print(f'Data written to file {csv_file} succesfully')
        queue.task_done()


class SubscriptionHandler(object):
    def __init__(self, wsconn):
        self.wsconn = wsconn
        self.q = asyncio.Queue()
        self.queuwriter = asyncio.create_task(csv_writer_task(self.q))

    # this is the producer which produces items and puts them into a queue
    async def datachange_notification(self, node, val, data):
        print("Datachange notification received.")
        dte = data.monitored_item.Value.ServerTimestamp.strftime("%d%m%Y %H:%M:%S")
        await self.q.put((dte, node, val))
        print(dte + "Data change notification was received and queued: ", node, val)


class SBConnection():
    def __init__(self):
        self.listOfWSNode = []
        self.dpsList = ...

    async def connectAndSubscribeToServer(self):
        self.csv_file = ''

        async with Client(url=self.url) as self.client:
            for element in self.dpsList:
                node = "ns=" + element["NS"] + ";s=" + element["Name"]
                var = self.client.get_node(node)
                self.listOfWSNode.append(var)
            print("Subscribe to ", self.listOfWSNode)

            handler = SubscriptionHandler(self)
            sub = await self.client.create_subscription(period=10, handler=handler)

            await sub.subscribe_data_change(self.listOfWSNode)
            print('subscription created')

            # create infinite loop
            while True:
                await asyncio.sleep(0.1)


async def main():
    uc = SBConnection()
    await uc.connectAndSubscribeToServer()

if __name__ == '__main__':
    asyncio.run(main())

Can anyone help me find the problem? Thanks

Code corrected after comment:

async def csv_writer_task(queue):
    print("csv_writer_task started")
    print('Queue list: ')
    print(queue)
    print('')
    while True:
        try:
            dte, node, val = await queue.get()
            print('Dequeue value: ', val)
        except Exception as e:
            print(f"Error in csv_writer_task while retrieving value fromt he queue: {e}")

        node_id_str = str(node.nodeid.Identifier)
        node_parts = node_id_str[len("Unit_SB."):].split('.')
        val_name = node_parts[-1].replace('_v', '')
        print('val name: ', val_name)
        for key, header_row in prefix_mapping.items():
            if f"Unit_SB.{key}" in node_id_str:
                csv_file = f"{key}.csv"
                print('csv file: ', csv_file)
                break
            else:
                print(f"Prefix {key} is no match for node: {node_id_str}. Skipping.")
        df = pd.read_csv(csv_file)
        if df.iloc[-1]['Timestamp'] == dte:
            print("Updating existing row")
            df.loc[df.index[-1], val_name] = val  # Modify the last row in the dataframe directly
        else:
            print("Appending new row")
            new_row = df.iloc[-1].copy()
            new_row['Timestamp'] = dte
            new_row[val_name] = val
            df = pd.concat([df, new_row.to_frame().T], ignore_index=True)
        
        df.to_csv(csv_file, index=False)
        print(f'Data written to file {csv_file} succesfully')
        queue.task_done()

Solution

  • As in the comment, the solution is that I was modifying a copy of the last row, but I was then saving the original row. By doing so I was loosing the changes.