Search code examples

Python Pandas - Simulate Streaming to Kafka

I am trying to practice some Kafka producing / consuming and am trying to set up a simulated 'stream' of data. I have tried looping through with time.sleep(0.0000001) but it is too slow to catch the entries. Here is what I am trying to do:

offsets = acc_df.offset.unique()
time_start = time.time()
for x in offsets:
    while time.time() - time_start != x:
        if time.time() - time_start == x:
            send_df = acc_df[acc_df['offset'] == x].to_dict()
            for x in send_df:
                send_data('accelerations', x)

If I am going about this the wrong way, please let me know!! Basically, once the elapsed time has reached the offset, I want to dump those rows to my Kafka topic.

Thanks in advance!

Edit: Here is some raw data that was requested instead of a picture

id  ride_id uuid    timestamp   offset  x   y   z   timelapse   filename
58682c5d48cad9d9e103431d773615bf    c9a2b46c9aa515b632eddc45c4868482    19b9aa10588646b3bf22c9b4865a7995    25:03.9 0.822061    -0.994  0.045   -0.036  FALSE
58682c5d48cad9d9e103431d773615bf    c9a2b46c9aa515b632eddc45c4868482    19b9aa10588646b3bf22c9b4865a7995    25:03.9 0.842061    -0.998  0.046   -0.04   FALSE
58682c5d48cad9d9e103431d773615bf    c9a2b46c9aa515b632eddc45c4868482    19b9aa10588646b3bf22c9b4865a7995    25:03.9 0.862061    -0.999  0.047   -0.036  FALSE
58682c5d48cad9d9e103431d773615bf    c9a2b46c9aa515b632eddc45c4868482    19b9aa10588646b3bf22c9b4865a7995    25:03.9 0.882061    -0.999  0.045   -0.034  FALSE
58682c5d48cad9d9e103431d773615bf    c9a2b46c9aa515b632eddc45c4868482    19b9aa10588646b3bf22c9b4865a7995    25:03.9 0.902061    -0.999  0.048   -0.033  FALSE

enter image description here


  • Try playing with this example.

    My computer outputs around 500 empty iterations before getting the first "offset", then about 15-20 loop iterations between each. Notice that I use < rather than == since timestamp subtraction will need rounded to have micro-second precision...

    import time
    # order doesn't matter... List will be iterated each time through while loop 
    offsets = [0.822061, 0.842061, 0.862061, 0.882061, 0.902061]
    time_start = time.time()
    counter = 0  # for debugging
    running = True
    while running:
        to_send = [o for o in offsets if o < time.time() - time_start]
        if to_send == offsets:
            running = False
        print(f'{counter}:{to_send}')  # TODO: replace with Kafka producer
        time.sleep(1 / 1000)
        counter += 1 

    Example output

    677:[0.822061, 0.842061]
    678:[0.822061, 0.842061]
    679:[0.822061, 0.842061]
    692:[0.822061, 0.842061]
    693:[0.822061, 0.842061, 0.862061]
    694:[0.822061, 0.842061, 0.862061]
    720:[0.822061, 0.842061, 0.862061, 0.882061]
    721:[0.822061, 0.842061, 0.862061, 0.882061]
    722:[0.822061, 0.842061, 0.862061, 0.882061]
    723:[0.822061, 0.842061, 0.862061, 0.882061, 0.902061]

    If you want to limit to data between two timestamps, you'll need to store the timestamp each time over the loop. For example, when you get to 0.822061, pop and subtract that value from the remaining values in the list (e.g. 0.842061 - 0.822061 = 0.02) and reset the start time. Then the loop is still running and after 0.02 seconds, you'll pop the next event as you did for the first event, and so on.

    Couldn't make a good example of that without duplicating offset values, so deleting the elements from the list is another solution.

    import time
    offsets = [0.822061, 0.842061, 0.862061, 0.882061, 0.902061]
    time_start = time.time()
    to_send = []  # buffer data between time.time() calls
    counter = 0  # for debugging
    running = True
    while running:
        if not offsets:
            running = False
        for i, o in enumerate(offsets):
            if o < time.time() - time_start:
                # build the buffer and remove from the main list
                del offsets[i]
        print(f'{counter}:{to_send}')  # TODO: replace with Kafka producer
        time.sleep(1 / 1000)
        counter += 1

    Regarding Kafka itself - Datagen source connector can simulate streaming data loads