Search code examples
pythontimerpygameclocktello-drone

How could I stop or delay a Pygame timer, which is linked to an User event?


I'm new to python, but I'm trying to learn it by myself for a university course. Therefore I have to program a pygame window that represents a drone fly zone (The drone should fly automated like a search helicopter in a given area and on the window a visual representation of what happens should be displayed...).

How an older version of the pygame window look's like when the virtual drone flies.

So far so good.

Now I have to link the movements of the game-represented drone with the real Tello drone. I figured out that using threading for the movement commands to the drone is the way to go when the pygame window should run the whole time.

But now I got stuck because I have to delay the automated movements in pygame in order to wait for the drone flight time and response.

I update the pygame window with a clock set to tick(60) in the loop of the game (therefore I created a Clock clock = pygame.time.Clock() in the initialization of the pygame program), but for the automated drone movements I created an Userevent. The drone movements is trigger like a Snake game via a Userevent witch should be triggered every second.

Therefor I implemented a timer:

self.SCREEN_UPDATE = pygame.USEREVENT

self.screen_update = pygame.time.set_timer(self.SCREEN_UPDATE, 1000).

But now I have to delay the automated movements in pygame every time the drone should do a movement. Otherwise, the pygame program run's too fast for the real-world problem. (The User event gets starter again before the real drone has finished his command)

I also tried to implement break functions for that, but they do not work.

   def break_rotation(self):
        print("start waiting")
        self.screen_update.time.wait(2000)
        #self.screen_update = pygame.time.set_timer(self.SCREEN_UPDATE, 2000)
        #pygame.time.wait(2000)
        print("end waiting")``
   def break_move(self):
        print("start waiting")
        self.screen_update.time.wait(3000)
        #self.screen_update = pygame.time.set_timer(self.SCREEN_UPDATE, 3000)
        #pygame.time.wait(3000)
        print("end waiting")

I also tried the comment lines. pygame.time.wait()makes the whole pygame wait. But You should still be able to use your keyboard for emergency landings and other stuff.

So I only have to delay the next Userevent, where the hole flypath for the next block / section get's tested for the automated flypath and the program decides to fly forward or should it turn.

Does anyone have an Idear how I could achieve that delay on the timer? Sadly I also couldn't find some similar problems.

EDIT 1:

@CmdCoder858

I think this code may hold the update back for a couple of seconds. When using the and operator combined with a passed variable. if event.type == SCREEN_UPDATE and x == 1:

I also tried using multiple events but that didn't work probably. if event.type == SCREEN_UPDATE and event.type == drone_task_finished: Interestingly it ignored the hole and operator this way.

import pygame
import threading
import _thread
import time



def task():
    global x
    # placeholder for drone task
    print('Test thread start')
    time.sleep(5)  # simulate delay in the task
    x = x+1
    print('Thread ends')
    pygame.event.post(pygame.event.Event(drone_task_finished))


if __name__ == '__main__':
    pygame.init()
    x = 0
    drone_task_finished = pygame.USEREVENT  # an integer representing the first user event
    drone_task = threading.Thread(target=task)  # create a thread with a task function
    drone_task.start()
    pygame.display.set_caption("Overview of the Search Area")
    window = pygame.display.set_mode((500, 300))

    SCREEN_UPDATE = pygame.USEREVENT
    screen_update = pygame.time.set_timer(SCREEN_UPDATE, 1000)

    print('Start event loop')


    while 1:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                exit()

            if event.type == SCREEN_UPDATE and x == 1:
                print('Hura Timer paused for 5 sec')
                x = 0
                drone_task = threading.Thread(target=task)  # create a thread with a task function
                drone_task.start()

            if event.type == SCREEN_UPDATE:
                print('Update nach 1sec')

            if event.type == drone_task_finished:  # detect if a drone_task_finished event has been posted
                print('Drone task has finished')

But is there any reason not to use import _thread?

With that, I could call the function multiple times in new threads. Therefore I have to change the task function to def task(threadName):

Now I could simply use _thread.start_new_thread(task, ("",)) every time the task should be used later on.


Solution

  • From what is sounds like, I would guess you want to wait for an event in another thread whilst keeping the main loop running, in which case I think you should try using pygame.event.post() function. This function is apparently thread safe so as long as it is called from a thread that was started after pygame.init() it should definitely work. You can create a custom event using the pygame.event.custom_type() function, meaning that you can wait for the event to be returned without having to pause the display. Comment below if you have any questions.

    Edit:

    Adding a code example for clarity:

    import pygame
    import threading
    import time
    
    
    def task():
        # placeholder for drone task
        time.sleep(3)  # simulate delay in the task
        pygame.event.post(pygame.event.Event(drone_task_finished))
    
    if __name__ == '__main__':
        pygame.init()
        drone_task_finished = pygame.USEREVENT  # an integer representing the first user event
        drone_task = threading.Thread(target=task)  # create a thread with a task function
        drone_task.start()
        print('Start event loop')
        while 1:
            for event in pygame.event.get():
                if event.type == drone_task_finished:  # detect if a drone_task_finished event has been posted
                    print('Drone task has finished')
    

    Essentially what is happening here is we are creating a task to be completed, then starting the task and continuing with the event loop. Once the task has finished, it will use the pygame.event.post function to tell the main thread that its task has been completed. The main thread can then detect this and proceed accordingly.

    Edit 2: To answer your questions, firstly don't worry about creating an answer to your own question, just editing your original question and mentioning that you have done so in a comment is perfectly fine.

    As for using the variable x as a way to delay the execution of another event, this can work but I would recommend keeping x out of the threaded task function, and instead modify it when your main thread detects the drone_task_finished event. This is much better because you don't have to deal with cross-thread variables, so just put that under the if event.type == drone_task_finished: block.

    As for using multiple custom events, the reason your code event.type == SCREEN_UPDATE and event.type == drone_task_finished returned True is because that statement is correct, and this is due to how you initialised each event type. You essentially used this code:

    drone_task_finished = pygame.USEREVENT
    SCREEN_UPDATE = pygame.USEREVENT
    

    However the actual way this should be done is like this:

    drone_task_finished = pygame.USEREVENT
    SCREEN_UPDATE = pygame.USEREVENT + 1
    

    This is due to how pygame handles event types. Each type is given an integer value, and the built in types get the values up until a certain point. The pygame.USEREVENT constant is an integer representing the first number that is not reserved for other events. This means that creating a new variable to represent an event in pygame should be the integer pygame.USEREVENT, and the second would have the value pygame.USEREVENT + 1. All of this is explained in more detail here.

    Finally, as explained in this answer, _thread is an implementation module that should generally be avoided in favour of the threading module, which is what I used in my code. With the threading module, you can create and start a new thread using this code:

    drone_task = threading.Thread(target=task)
    drone_task.start()
    

    The threading.Thread class also has an optional arguments parameter, meaning it is easy to pass in all the arguments you need to the new thread. You can find more information on how this all works in the documentation.