Search code examples
pythonazure-eventhub

Eventhub Reading data as list in python


I want to use Azure Eventhub as a middleware messaging queue. I am basically sending simulated data in list formats and receiving it in string format now.

As you can see Here, there are only a few formats in which data is convertible. I want the format of data to be a list with float data in it.

Here is the code that I am working on right now. I am trying to manipulate the line below to each event data in float form being accumulated in the list.

LIST.append(event_data.message._body)

This is the body of my code.

CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"


total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
i=1
LIST=[]
try:
    receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=None)
    while batch:
        for event_data in batch[-100:]:

            last_offset = event_data.offset
            last_sn = event_data.sequence_number
            print("Received: {}, {}".format(i, last_sn))
            LIST.append(event_data.message._body)

            i += 1
            total += 1
        batch = receiver.receive(timeout=5000)

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

You may find the eventData class in Here

=================================UPDATE ===================================

As a result, it shows 'Message [a b c ....]', and I think the Message was set to be written, so I want to remove the word 'Message' in the result format.

The "sender.py" is following:

from azure.eventhub import EventHubClient, Sender, EventData
import time
import logging
import numpy as np

logger = logging.getLogger("azure")

ADDRESS = ""
USER = "RootManageSharedAccessKey"
KEY = ""

try:
    if not ADDRESS:
        raise ValueError("No EventHubs URL supplied.")

    # Create Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    sender = client.add_sender(partition="0")
    client.run()
    forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4
    x_value = np.arange(100)
    try:
        start_time = time.time()
        for i in range(100):

            y_value1 = forging2(x_value) + np.random.normal(0,1,len(x_value))*3
            y_value1 = np.asarray(y_value1)
            print("Sending message: {}, {}".format(i, y_value1))
            message = y_value1
            sender.send(EventData(message))
            time.sleep(0.35)
    except:
        raise
    finally:
        end_time = time.time()
        client.stop()
        run_time = end_time - start_time
        logger.info("Runtime: {} seconds".format(run_time))

except KeyboardInterrupt:
    pass

Solution

  • The fixed code following worked:

    logger = logging.getLogger("azure")
    
    ADDRESS = ""
    USER = "RootManageSharedAccessKey"
    KEY = ""
    
    try:
        if not ADDRESS:
            raise ValueError("No EventHubs URL supplied.")
        # Create Event Hubs client
        client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
        sender = client.add_sender(partition="0")
        client.run()
        forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4
        x_value = np.arange(100)
        try:
            start_time = time.time()
            for i in range(100000):
    
                y_value1 = forging1(x_value) + np.random.normal(0,1,len(x_value))*3
                y_value1 = np.asarray(y_value1)
                print("Sending message: {}, {}".format(i, y_value1))
                message = "{}".format(y_value1)
                sender.send(EventData(message))
                time.sleep(0.35)
        except:
            raise
        finally:
            end_time = time.time()
            client.stop()
            run_time = end_time - start_time
            logger.info("Runtime: {} seconds".format(run_time))
    
    except KeyboardInterrupt:
        pass
    
    

    In this way, I was able to receive the messages without "messages" in it.