Search code examples
pythonwinapisingletonmutex

Python: Application Must Wait Until Another Finishes (Mutex)


I'm creating an application that must run one at a time, so I tried to create it based on this recipe: https://code.activestate.com/recipes/474070-creating-a-single-instance-application/

The problem is, I need the application to wait the another one finishes and then run.

I tried something like this:

def __init__(self) -> None:
    while True:
        lastError = 0
        SetLastError(0)
        self.Mutex = CreateMutex(None, False, self.MUTEX_NAME)
        lastError = GetLastError()
        if (lastError != ERROR_ALREADY_EXISTS):
            break
        else:
            print("It's already running... Waiting...")
            time.sleep(5)

def __del__(self) -> None:
    if self.Mutex:
        CloseHandle(self.Mutex)

When I run this code in "debug mode" on VSCode line by line, it works, because GetLastError return 0 after the first process finishes.

But when I just run twice like python app.py, the second process always print "It's already running... Waiting..."

How could I solve it?


Solution

  • The main problem is not owning the created mutex in the process that created it. Here's working code:

    import win32event as evt
    import win32api as api
    
    ERROR_ALREADY_EXISTS = 183  # Not defined in pywin32.
    
    class SingleInstance:
        def __init__(self):
            # Own mutex if created, handle returned in any case.
            self.Mutex = evt.CreateMutex(None, True, 'Global\\MyMutex')
            if api.GetLastError() == ERROR_ALREADY_EXISTS:
                print('Waiting for other instance')
                # Wait for the mutex to be released
                wait_result = evt.WaitForSingleObject(self.Mutex, evt.INFINITE)
                # If the other instance is closed without releasing the mutex,
                # WAIT_ABANDONED is returned.  Make sure to check for both.
                if wait_result in (evt.WAIT_OBJECT_0, evt.WAIT_ABANDONED):
                    # Mutex is released, can proceed
                    print('Continuing')
    
        def release(self):
            if self.Mutex is not None:
                evt.ReleaseMutex(self.Mutex)
                api.CloseHandle(self.Mutex)
                self.Mutex = None
    
        def __del__(self):  # called when instance goes out-of-scope
            self.release()
    
        def __enter__(self):  # called when created with "with"
            return self
    
        def __exit__(self, typ, value, tb):  # called when leaving "with" block
            self.release()
    
    with SingleInstance():
        input('Application running. Press ENTER to exit:')
    

    Note that owning the mutex isn't needed if the second instance simply exits (as in the ActiveState example) because in that case creating the mutex indicates it is the only running instance. Since the second instance remains running the mutex must be owned so WaitForSingleObject can actually wait for it be released when the first instance exits.