Search code examples
pythonpython-multithreading

Timeout for python function


I'm using a lib/SDK to control a fingerprint reader.

I wrote this:

import ctypes

SDK_ERRORS = {
    -23: "Not Connected",
    -22: "SDK not started",
}

class FingerPrintReader:

    def __init__(self):
        self.sdk = None
        try:
            self.sdk = ctypes.CDLL("/usr/lib/lib_sdk.so")
            result = self.sdk.SDK_Biometric_Start()
            if result != 1:
                print(f"ERROR: {SDK_ERRORS[result]}")
            print("Sucess!")
        except OSError as error:
            print(f"[ERROR] {error}")
    
    def read_fingerprint(self) -> Union[bytes, None]:
    if self.sdk:
        template = bytes(b"\x00") * 669
        result = self.sdk.SDK_Biometric_ReadFingerPrint(template)
        if result != 1:
            print(f"ERROR: {SDK_ERRORS[result]}")
            return
        print("Sucess!")
        return template
    return

    def save_fingerprint_as_template(self, template: bytes) -> None:
        file_name = time.time()
        if template:
            with open(f"{file_name}.tpl", "wb") as template_file:
                template_file.write(template)

fingerprint = FingerPrintReader()
template = fingerprint.read_fingerprint()
fingerprint.save_fingerprint_as_template(template)

My code works, but I want to set a timeout because I need to finish read_fingerprint. I'm thinking of using threading, but I don't know because my function needs to return the template. My possible solution was to create a new function, but I don't know if this is a good idea.


Solution

  • With @relent95 suggestion, I came up with this solution:

        def read_fingerprint(self) -> Union[bytes, None]:
            def read_fingerprint_worker(queue):
            template = bytes(b"\x00") * 669
            reading = self.sdk.SDK_Biometric_ReadFingerPrint(template)
            fingerprint_result = {"reading": reading, "template": template}
            queue.put(fingerprint_result)
    
            if self.sdk:
                result_queue = multiprocessing.Queue()
                p = multiprocessing.Process(
                    target=read_fingerprint_worker, args=(result_queue,)
                )
                p.start()
                p.join(20)
                if p.is_alive():
                    p.terminate()
                    self.cancel_fingerprint_reader()
                    return None
                if not result_queue.empty():
                    result = result_queue.get()
                    if result["reading"] != 1:
                        print(f"ERROR: {SDK_ERRORS[result]}")
                        return None
                    return result["template"]
            return None