Search code examples
pythonmultithreadingiteratorproducer-consumersliding-window

python time sliding window variation


I'm stuck with a variation of sliding window problem!

Usually we set the number of element to slide but in my case I want to slide the time!

The goal that I would like to reach is a function (thread in this case) that is able to create a "time" windows in seconds (given by user).

Starting from the first element of the queue in this case:

[datetime.time(7, 6, 14, 537370), 584 add 5 seconds -> 7:6:19.537370 (ending point) and sum all elements in this interval:

 [datetime.time(7, 6, 14, 537370), 584]
 [datetime.time(7, 6, 18, 542798), 761]

Total: 584+761= 1345

Then create another "windows" with the second elements and goes on. IMPORTANT: One item can be part of several window. the item are generated meanwhile, a naif solution with function that sleep for n second and then flush the queue is not good for my problem.

I think its a variation of this post: Flexible sliding window (in Python)

But still can't solve the problem! Any help or suggests will be appreciated. Thanks!

Example list of elements:

 [datetime.time(7, 6, 14, 537370), 584]
 [datetime.time(7, 6, 18, 542798), 761]
 [datetime.time(7, 6, 20, 546007), 848]
 [datetime.time(7, 6, 24, 550969), 20]
 [datetime.time(7, 6, 27, 554370), 478]
 [datetime.time(7, 6, 27, 554628), 12]
 [datetime.time(7, 6, 31, 558919), 29]
 [datetime.time(7, 6, 31, 559562), 227]
 [datetime.time(7, 6, 32, 560863), 379]
 [datetime.time(7, 6, 35, 564863), 132]
 [datetime.time(7, 6, 37, 567276), 651]
 [datetime.time(7, 6, 38, 568652), 68]
 [datetime.time(7, 6, 40, 569861), 100]
 [datetime.time(7, 6, 41, 571459), 722]
 [datetime.time(7, 6, 44, 574802), 560]

...

Code:

 import random
 import time
 import threading
 import datetime
 from multiprocessing import Queue

 q = Queue()

 #this is a producer that put elements in queue

 def t1():
     element = [0,0]
     while True:
         time.sleep(random.randint(0, 5))
         element[0] = datetime.datetime.now().time()
         element[1] = random.randint(0, 1000)
         q.put(element)


 #this is a consumer that sum elements inside a window of n seconds
 #Ineed something a sliding window time of ten seconds that sum all elements for n seconds

 def t2():
     windowsize = 5 #size of the window 5 seconds
     while not queue.empty():
         e = q.get()
         start = e[0] #the first element is the beginning point
         end = start + datetime.timedelta(seconds=windowsize) #ending point
         sum += e[1]
         #some code that solve the problem :)



 a = threading.Thread(target=t1)
 a.start()

 b = threading.Thread(target=t2)
 b.start()

 while True:
     time.sleep(1)

Solution

  • Would this do? This is how I understood your problem. What this does is it creates a class that keeps track of things. You either add to this by tw.insert() or sum with tw.sum_window(seconds).

    When you initialise TimeWindow, you can give it a max size parameter, default is 10 seconds. When you add elements or calculate sums, it does a clean up so that before every insert or sum operation, first element time e[0][0] and last element time e[n][0] are within 10 seconds of each other. Older entries are expunged. A "poller" thread is there to track your requests.

    I have added two queues as I do not know what you intend to do with results. Now if you want to request data starting from now to 5 seconds in the future, you create a request and put it in queue. The request has a random id so that you can match it to results. Your main thread needs to monitor result queue and after five seconds, every request sent to queue return with the same id and the sum.

    If this is not what you want to do, then I just don't understand what is it that you try to achieve here. Even this is already rather complicated and there may be a much simpler way to achieve what you intend to do.

    import random
    import time
    import threading
    import datetime
    import Queue
    import uuid
    
    from collections import deque
    
    q_lock = threading.RLock()
    
    
    class TimeWindow(object):
        def __init__(self, max_size=10):
            self.max_size = max_size
            self.q = deque()
    
        def expire(self):
            time_now = datetime.datetime.now()
            while True:
                try:
                    oldest_element = self.q.popleft()
                    oe_time = oldest_element[0]
                    if oe_time + datetime.timedelta(seconds=self.max_size) > time_now:
                        self.q.appendleft(oldest_element)
                        break
    
                except IndexError:
                    break
    
        def insert(self,elm):
            self.expire()
            self.q.append(elm)
    
        def sum_window(self, start, end):
            self.expire()
            try:
                _ = self.q[0]
            except IndexError:
                return 0
            result=0
            for f in self.q:
                if start < f[0] < end:
                    result += f[1]
                else:
                    pass
            return result
    
    
    tw = TimeWindow()
    
    
    def t1():
        while True:
            time.sleep(random.randint(0, 3))
            element = [datetime.datetime.now(), random.randint(0,1000)]
            with q_lock:
                tw.insert(element)
    
    
    def poller(in_q, out_q):
        pending = []
        while True:
            try:
                new_request = in_q.get(0.1)
                new_request["end"] = new_request["start"] + datetime.timedelta(seconds=new_request["frame"])
                pending.append(new_request)
            except Queue.Empty:
                pass
    
            new_pending = []
            for a in pending:
                if a["end"] < datetime.datetime.now():
                    with q_lock:
                        r_sum = tw.sum_window(a["start"], a["end"])
                    r_structure = {"id": a["id"], "result": r_sum}
                    out_q.put(r_structure)
                else:
                    new_pending.append(a)
            pending = new_pending
    
    
    a = threading.Thread(target=t1)
    a.daemon = True
    a.start()
    in_queue = Queue.Queue()
    result_queue = Queue.Queue()
    
    po = threading.Thread(target=poller, args=(in_queue, result_queue,))
    po.daemon = True
    po.start()
    
    while True:
        time.sleep(1)
        newr = {"id": uuid.uuid4(), "frame": 5, "start": datetime.datetime.now()}
        in_queue.put(newr)
        try:
            ready = result_queue.get(0)
            print ready
        except Queue.Empty:
            pass