Search code examples
pythonpython-3.xwindowsctypeskernel32

How to Enumerate Threads using CreateToolhelp32Snapshot and Python ctypes?


This seems like it should print the thread ID of the first thread in the snapshot, but it always prints 0. What is wrong with it?

The following assumes that process ID 1234 is a real, running process.

import ctypes
from ctypes import wintypes

pid = 1234

TH32CS_SNAPTHREAD = 0x00000004

kernel32 = ctypes.windll.kernel32

class THREADENTRY32(ctypes.Structure):
    _fields_ = [
        ('dwSize',             wintypes.DWORD),
        ('cntUsage',           wintypes.DWORD),
        ('th32ThreadID',       wintypes.DWORD),
        ('th32OwnerProcessID', wintypes.DWORD),
        ('tpBasePri',          wintypes.LONG),
        ('tpDeltaPri',         wintypes.LONG),
        ('dwFlags',            wintypes.DWORD)
    ]

CreateToolhelp32Snapshot = kernel32.CreateToolhelp32Snapshot
CreateToolhelp32Snapshot.argtypes = (wintypes.DWORD, wintypes.DWORD, )
CreateToolhelp32Snapshot.restype = wintypes.HANDLE

h_snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, pid)

LPTHREADENTRY32 = ctypes.POINTER(THREADENTRY32)
Thread32First = kernel32.Thread32First
Thread32First.argtypes = (wintypes.HANDLE, LPTHREADENTRY32, )
Thread32First.restype = wintypes.BOOL
thread_entry = THREADENTRY32()
thread_entry.dwSize = ctypes.sizeof(THREADENTRY32)

if kernel32.Thread32First(h_snapshot, ctypes.byref(thread_entry)):
    print(thread_entry.th32ThreadID)

Solution

  • Listing:

    The problem is you only iterated the 1st thread which belongs to (dummy) System Idle Process (which has the (dummy) PId 0, and a bunch of threads with (dummy) TId 0).

    Here's an example.

    code00.py:

    #!/usr/bin/env python
    
    import ctypes as cts
    import sys
    from ctypes import wintypes as wts
    
    
    TH32CS_SNAPTHREAD = 0x00000004
    INVALID_HANDLE_VALUE = wts.HANDLE(-1)
    ERROR_NO_MORE_FILES = 0x12
    
    
    class THREADENTRY32(cts.Structure):
        _fields_ = (
            ("dwSize", wts.DWORD),
            ("cntUsage", wts.DWORD),
            ("th32ThreadID", wts.DWORD),
            ("th32OwnerProcessID", wts.DWORD),
            ("tpBasePri", wts.LONG),
            ("tpDeltaPri", wts.LONG),
            ("dwFlags", wts.DWORD)
        )
    
    LPTHREADENTRY32 = cts.POINTER(THREADENTRY32)
    
    
    kernel32 = cts.windll.kernel32
    
    CreateToolhelp32Snapshot = kernel32.CreateToolhelp32Snapshot
    CreateToolhelp32Snapshot.argtypes = (wts.DWORD, wts.DWORD)
    CreateToolhelp32Snapshot.restype = wts.HANDLE
    
    Thread32First = kernel32.Thread32First
    Thread32First.argtypes = (wts.HANDLE, LPTHREADENTRY32)
    Thread32First.restype = wts.BOOL
    
    Thread32Next = kernel32.Thread32Next
    Thread32Next.argtypes = (wts.HANDLE, LPTHREADENTRY32)
    Thread32Next.restype = wts.BOOL
    
    CloseHandle = kernel32.CloseHandle
    CloseHandle.argtypes = (wts.HANDLE,)
    CloseHandle.restype = wts.BOOL
    
    GetLastError = kernel32.GetLastError
    GetLastError.argtypes = ()
    GetLastError.restype = wts.DWORD
    
    
    def main(*argv):
        pid = int(argv[0]) if argv and argv[0].isdecimal() else None
        print(f"Searching threads for PId: {'ANY' if pid is None else pid}")
        flags = TH32CS_SNAPTHREAD
        snap = CreateToolhelp32Snapshot(flags, 0)
        if snap == INVALID_HANDLE_VALUE:
            print(f"CreateToolhelp32Snapshot failed: {GetLastError()}")
            return -1
        #print(snap)
        entry = THREADENTRY32()
        size = cts.sizeof(THREADENTRY32)
        entry.dwSize = size
        res = Thread32First(snap, cts.byref(entry))
        idx = 0
        while res:
            if pid is None:
                print(f"  TId: {entry.th32ThreadID} (PId: {entry.th32OwnerProcessID})")
            else:
                if entry.th32OwnerProcessID == pid:
                    print(f"  TId: {entry.th32ThreadID}")
            idx += 1
            res = Thread32Next(snap, cts.byref(entry))
        print(f"Enumerated {idx} threads")
        gle = GetLastError()
        if gle != ERROR_NO_MORE_FILES:
            print(f"Error: {gle}")
        CloseHandle(snap)
    
    
    if __name__ == "__main__":
        print(
            "Python {:s} {:03d}bit on {:s}\n".format(
                " ".join(elem.strip() for elem in sys.version.split("\n")),
                64 if sys.maxsize > 0x100000000 else 32,
                sys.platform,
            )
        )
        rc = main(*sys.argv[1:])
        print("\nDone.\n")
        sys.exit(rc)
    

    Output:

    [cfati@CFATI-5510-0:e:\Work\Dev\StackExchange\StackOverflow\q078690213]> "e:\Work\Dev\VEnvs\py_pc064_03.10_test0\Scripts\python.exe" ./code00.py 30516
    Python 3.10.11 (tags/v3.10.11:7d4cc5a, Apr  5 2023, 00:38:17) [MSC v.1929 64 bit (AMD64)] 064bit on win32
    
    Searching threads for PId: 30516
      TId: 9480
      TId: 28000
    Enumerated 4831 threads
    
    Done.