Search code examples
pythonmultiprocessingpython-multiprocessing

How to detect a pressed key within Python Process?


With a simple example I try to demonstrate a typical multiprocessing setup with two processes:

  1. To receive data (here simulated by random array genereation)
  2. To send data

I want to terminate the first process by a keyboard keypress, which will send None to a queue, which then stops the program. I use the keyboard package for detecting if a key is pressed.

import multiprocessing
import keyboard
import numpy as np

def getData(queue):

    KEY_PRESSED = False

    while KEY_PRESSED is False:
    
        if keyboard.is_pressed("a"):
            queue.put(None)
            print("STOP in getData")
            KEY_PRESSED=True
        else:
            data = np.random.random([8, 250])
            queue.put(data)

def processData(queue):
    
    FLAG_STOP = False

    while FLAG_STOP is False:
        data = queue.get()  # # ch, samples
        if data is None:
            print("STOP in processData")
            FLAG_STOP = True
        else:
            print("Processing Data")
            print(str(data[0,0]))

if __name__ == "__main__":

    queue = multiprocessing.Queue()
    processes = [
        multiprocessing.Process(target=getData, args=(queue,)),
        multiprocessing.Process(target=processData, args=(queue,))
    ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

If I debug the code, the pressed key is actually detected, but simultaneously random data from the while loop is put into the queue. Which makes it very difficult debugging the code.

Additionally I tried the pynput package, which creates a thread to detect a pressed key. Using this method however the same problem occured, the program did not savely terminated the execution by sending out None to the other process.

I would be very happy if someone could point to the error in the described method, or propose another way of savely detecting keypress within a processs.


Solution

  • I am not sure what problem you are describing: savely is not an English word. You say that the pressed key is actually detected. If that is the case and if you have print("STOP...") statements in both functions, then if you just run the code from a command prompt and getData detects that an a is being pressed, then I don't see how both print statements won't eventually be executed and the two processes terminate.

    If the problem is that the program does not terminate for a very long while, then I think what you are missing is that unless the call to keyboard.is_pressed("a") is a particularly slow function to execute, by time you get around to pressing a on the keyboard, function getData will have already written thousands of records to the queue before it writes None out. That means that processData has to first read those thousands of records and print them before it finally gets to the None record. Since it has to also print the numbers, there is no way the processData can keep up with getData. So long after getData has written its None record, processData still has thousands of records to read.

    This can be demonstrated in a variation of your code where function getData does not wait for keyboard input but simply writes random numbers to the output queue for 5 seconds before writing its None record and terminating (this simulates your program where you wait 5 seconds before pressing a). Function processData prints the number of records it has read before it gets to the None record and the elapsed time it took to read and print those records:

    import multiprocessing
    import numpy as np
    import time
    
    def getData(queue):
    
        KEY_PRESSED = False
    
        expiration = time.time() + 5
        # Run for 5 seconds:
        while KEY_PRESSED is False:
    
            if time.time() > expiration:
                queue.put(None)
                print("STOP in getData")
                KEY_PRESSED=True
            else:
                data = np.random.random([8, 250])
                queue.put(data)
    
    def processData(queue):
    
        FLAG_STOP = False
    
        t = time.time()
        cnt = 0
        while FLAG_STOP is False:
            data = queue.get()  # # ch, samples
            if data is None:
                print("STOP in processData")
                print('Number of items read from queue:', cnt, 'elapsed_time:', time.time() - t)
                FLAG_STOP = True
            else:
                cnt += 1
                print("Processing Data")
                print(str(data[0,0]))
    
    if __name__ == "__main__":
    
        queue = multiprocessing.Queue()
        processes = [
            multiprocessing.Process(target=getData, args=(queue,)),
            multiprocessing.Process(target=processData, args=(queue,))
        ]
    
        for p in processes:
            p.start()
    
        for p in processes:
            p.join()
    

    Prints:

    ...
    Processing Data
    0.21449036510257957
    Processing Data
    0.27058883046461824
    Processing Data
    0.5433716680659376
    STOP in processData
    Number of items read from queue: 128774 elapsed_time: 35.389172077178955
    

    Even though getData wrote numbers for only 5 seconds, it took 35 seconds for processData to read and print them.

    The problem can be resolved by putting a limit on the number of messages that can be on the Queue instance:

        queue = multiprocessing.Queue(1)
    

    This will block getData from putting the next value on the queue until processData has read the value.

    Prints:

    ...
    Processing Data
    0.02822635996071321
    Processing Data
    0.05390434023333657
    Processing Data
    STOP in getData
    0.9030600225686444
    STOP in processData
    Number of items read from queue: 16342 elapsed_time: 5.000030040740967
    

    So if you use a queue with a maxsize of 1, your program should terminate immediately upon pressing the a key.