Search code examples
pythonctypessymlinkjunctionreadlink

Having trouble implementing a readlink() function


I've been trying to figure out a way to get some sort of ability to be able to return the true abspath of a symbolic link in Windows, under Python 2.7. (I cannot upgrade to 3.x, as most DCCs such as Maya/3ds max do not use that version of Python)

I've looked at the sid0 ntfs utils (whose islink() function works, but readlink() function always returns an empty unicode string for me for some reason), and the juntalis ntfs libs (which unfortunately, I couldn't get to work), along with a helpful script someone posted:

import os, ctypes, struct
from ctypes import windll, wintypes

FSCTL_GET_REPARSE_POINT = 0x900a8

FILE_ATTRIBUTE_READONLY      = 0x0001
FILE_ATTRIBUTE_HIDDEN        = 0x0002
FILE_ATTRIBUTE_DIRECTORY     = 0x0010
FILE_ATTRIBUTE_NORMAL        = 0x0080
FILE_ATTRIBUTE_REPARSE_POINT = 0x0400


GENERIC_READ  = 0x80000000
GENERIC_WRITE = 0x40000000
OPEN_EXISTING = 3
FILE_READ_ATTRIBUTES = 0x80
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value

INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF

FILE_FLAG_OPEN_REPARSE_POINT = 2097152
FILE_FLAG_BACKUP_SEMANTICS = 33554432
# FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTI
FILE_FLAG_REPARSE_BACKUP = 35651584


GetFileAttributes = windll.kernel32.GetFileAttributesW
_CreateFileW = windll.kernel32.CreateFileW
_DevIoCtl = windll.kernel32.DeviceIoControl
_DevIoCtl.argtypes = [
    wintypes.HANDLE, #HANDLE hDevice
    wintypes.DWORD, #DWORD dwIoControlCode
    wintypes.LPVOID, #LPVOID lpInBuffer
    wintypes.DWORD, #DWORD nInBufferSize
    wintypes.LPVOID, #LPVOID lpOutBuffer
    wintypes.DWORD, #DWORD nOutBufferSize
    ctypes.POINTER(wintypes.DWORD), #LPDWORD lpBytesReturned
    wintypes.LPVOID] #LPOVERLAPPED lpOverlapped
_DevIoCtl.restype = wintypes.BOOL


def islink(path):
    # assert os.path.isdir(path), path
    if GetFileAttributes(path) & FILE_ATTRIBUTE_REPARSE_POINT:
        return True
    else:
        return False


def DeviceIoControl(hDevice, ioControlCode, input, output):
    # DeviceIoControl Function
    # http://msdn.microsoft.com/en-us/library/aa363216(v=vs.85).aspx
    if input:
        input_size = len(input)
    else:
        input_size = 0
    if isinstance(output, int):
        output = ctypes.create_string_buffer(output)
    output_size = len(output)
    assert isinstance(output, ctypes.Array)
    bytesReturned = wintypes.DWORD()
    status = _DevIoCtl(hDevice, ioControlCode, input,
                       input_size, output, output_size, bytesReturned, None)
    print "status(%d)" % status
    if status != 0:
        return output[:bytesReturned.value]
    else:
        return None


def CreateFile(path, access, sharemode, creation, flags):
    return _CreateFileW(path, access, sharemode, None, creation, flags, None)


SymbolicLinkReparseFormat = "LHHHHHHL"
SymbolicLinkReparseSize = struct.calcsize(SymbolicLinkReparseFormat);

def readlink(path):
    """ Windows readlink implementation. """
    # This wouldn't return true if the file didn't exist, as far as I know.
    assert islink(path)
    # assert type(path) == unicode

    # Open the file correctly depending on the string type.
    hfile = CreateFile(path, GENERIC_READ, 0, OPEN_EXISTING,
                       FILE_FLAG_REPARSE_BACKUP)
    # MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 16384 = (16*1024)
    buffer = DeviceIoControl(hfile, FSCTL_GET_REPARSE_POINT, None, 16384)
    windll.CloseHandle(hfile)

    # Minimum possible length (assuming length of the target is bigger than 0)
    if not buffer or len(buffer) < 9:
        return None

    # Only handle SymbolicLinkReparseBuffer
    (tag, dataLength, reserver, SubstituteNameOffset, SubstituteNameLength,
     PrintNameOffset, PrintNameLength,
     Flags) = struct.unpack(SymbolicLinkReparseFormat,
                            buffer[:SymbolicLinkReparseSize])
    print tag, dataLength, reserver, SubstituteNameOffset, SubstituteNameLength
    start = SubstituteNameOffset + SymbolicLinkReparseSize
    actualPath = buffer[start : start + SubstituteNameLength].decode("utf-16")
    # This utf-16 string is null terminated
    index = actualPath.find(u"\0")
    assert index > 0
    if index > 0:
        actualPath = actualPath[:index]
    if actualPath.startswith(u"?\\"):
        return actualPath[2:]
    else:
        return actualPath

However, most of the time various solutions I've tried end up giving me:

[Error 126] The specified module could not be found

Even though I am able to import ctypes and do stuff such as importing cdll:

libc = cdll.msvcrt
libc.printf
<_FuncPtr object at 0x0000000002A9F388>

I'm pretty new to this part of Python and ctypes in general, so any pointers on dealing with symbolic links here would be very much appreciated!


Solution

  • ERROR_MOD_NOT_FOUND (126) is likely due to windll.CloseHandle(hfile), which tries to load "closehandle.dll". It's missing kernel32.

    Here's an alternate implementation that handles junctions as well as symbolic links.

    ctypes definitions

    import ctypes
    from ctypes import wintypes
    
    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
    
    FILE_READ_ATTRIBUTES = 0x0080
    OPEN_EXISTING = 3
    FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
    FILE_FLAG_BACKUP_SEMANTICS   = 0x02000000
    FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
    
    IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
    IO_REPARSE_TAG_SYMLINK     = 0xA000000C
    FSCTL_GET_REPARSE_POINT    = 0x000900A8
    MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 0x4000
    
    LPDWORD = ctypes.POINTER(wintypes.DWORD)
    LPWIN32_FIND_DATA = ctypes.POINTER(wintypes.WIN32_FIND_DATAW)
    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
    
    def IsReparseTagNameSurrogate(tag):
        return bool(tag & 0x20000000)
    
    def _check_invalid_handle(result, func, args):
        if result == INVALID_HANDLE_VALUE:
            raise ctypes.WinError(ctypes.get_last_error())
        return args
    
    def _check_bool(result, func, args):
        if not result:
            raise ctypes.WinError(ctypes.get_last_error())
        return args
    
    kernel32.FindFirstFileW.errcheck = _check_invalid_handle
    kernel32.FindFirstFileW.restype = wintypes.HANDLE
    kernel32.FindFirstFileW.argtypes = (
        wintypes.LPCWSTR,  # _In_  lpFileName
        LPWIN32_FIND_DATA) # _Out_ lpFindFileData
    
    kernel32.FindClose.argtypes = (
        wintypes.HANDLE,) # _Inout_ hFindFile
    
    kernel32.CreateFileW.errcheck = _check_invalid_handle
    kernel32.CreateFileW.restype = wintypes.HANDLE
    kernel32.CreateFileW.argtypes = (
        wintypes.LPCWSTR, # _In_     lpFileName
        wintypes.DWORD,   # _In_     dwDesiredAccess
        wintypes.DWORD,   # _In_     dwShareMode
        wintypes.LPVOID,  # _In_opt_ lpSecurityAttributes
        wintypes.DWORD,   # _In_     dwCreationDisposition
        wintypes.DWORD,   # _In_     dwFlagsAndAttributes
        wintypes.HANDLE)  # _In_opt_ hTemplateFile 
    
    kernel32.CloseHandle.argtypes = (
        wintypes.HANDLE,) # _In_ hObject
    
    kernel32.DeviceIoControl.errcheck = _check_bool
    kernel32.DeviceIoControl.argtypes = (
        wintypes.HANDLE,  # _In_        hDevice
        wintypes.DWORD,   # _In_        dwIoControlCode
        wintypes.LPVOID,  # _In_opt_    lpInBuffer
        wintypes.DWORD,   # _In_        nInBufferSize
        wintypes.LPVOID,  # _Out_opt_   lpOutBuffer
        wintypes.DWORD,   # _In_        nOutBufferSize
        LPDWORD,          # _Out_opt_   lpBytesReturned
        wintypes.LPVOID)  # _Inout_opt_ lpOverlapped 
    
    class REPARSE_DATA_BUFFER(ctypes.Structure):
        class ReparseData(ctypes.Union):
            class LinkData(ctypes.Structure):
                _fields_ = (('SubstituteNameOffset', wintypes.USHORT),
                            ('SubstituteNameLength', wintypes.USHORT),
                            ('PrintNameOffset',      wintypes.USHORT),
                            ('PrintNameLength',      wintypes.USHORT))
                @property
                def PrintName(self):
                    dt = wintypes.WCHAR * (self.PrintNameLength //
                                           ctypes.sizeof(wintypes.WCHAR))
                    name = dt.from_address(ctypes.addressof(self.PathBuffer) +
                                           self.PrintNameOffset).value
                    if name.startswith(r'\??'):
                        name = r'\\?' + name[3:] # NT => Windows
                    return name
            class SymbolicLinkData(LinkData):
                _fields_ = (('Flags',      wintypes.ULONG),
                            ('PathBuffer', wintypes.BYTE * 0))
            class MountPointData(LinkData):
                _fields_ = (('PathBuffer', wintypes.BYTE * 0),)
            class GenericData(ctypes.Structure):
                _fields_ = (('DataBuffer', wintypes.BYTE * 0),)
            _fields_ = (('SymbolicLinkReparseBuffer', SymbolicLinkData),
                        ('MountPointReparseBuffer',   MountPointData),
                        ('GenericReparseBuffer',      GenericData))
        _fields_ = (('ReparseTag',        wintypes.ULONG),
                    ('ReparseDataLength', wintypes.USHORT),
                    ('Reserved',          wintypes.USHORT),
                    ('ReparseData',       ReparseData))
        _anonymous_ = ('ReparseData',)
    

    functions

    def islink(path):
        data = wintypes.WIN32_FIND_DATAW()
        kernel32.FindClose(kernel32.FindFirstFileW(path, ctypes.byref(data)))
        if not data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT:
            return False
        return IsReparseTagNameSurrogate(data.dwReserved0)
    
    def readlink(path):
        n = wintypes.DWORD()
        buf = (wintypes.BYTE * MAXIMUM_REPARSE_DATA_BUFFER_SIZE)()
        flags = FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS
        handle = kernel32.CreateFileW(path, FILE_READ_ATTRIBUTES, 0, None,
                    OPEN_EXISTING, flags, None)
        try:
            kernel32.DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0,
                buf, ctypes.sizeof(buf), ctypes.byref(n), None)
        finally:
            kernel32.CloseHandle(handle)
        rb = REPARSE_DATA_BUFFER.from_buffer(buf)
        tag = rb.ReparseTag
        if tag == IO_REPARSE_TAG_SYMLINK:
            return rb.SymbolicLinkReparseBuffer.PrintName
        if tag == IO_REPARSE_TAG_MOUNT_POINT:
            return rb.MountPointReparseBuffer.PrintName
        if not IsReparseTagNameSurrogate(tag):
            raise ValueError("not a link")
        raise ValueError("unsupported reparse tag: %d" % tag)
    

    example

    >>> sys.version
    '2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016, 20:53:40) 
    [MSC v.1500 64 bit (AMD64)]'
    >>> os.system(r'mklink /d spam C:\Windows')
    symbolic link created for spam <<===>> C:\Windows
    0
    >>> islink('spam')
    True
    >>> readlink('spam')
    u'C:\\Windows'
    >>> islink('C:/Documents and Settings') # junction
    True
    >>> readlink('C:/Documents and Settings')
    u'C:\\Users'
    >>> islink('C:/Users/All Users') # symlinkd
    True
    >>> readlink('C:/Users/All Users')
    u'C:\\ProgramData'