Search code examples
pythonpython-3.xazure-data-lakeazure-data-lake-gen2

How to upload a file to ADLS Gen2 using Python


I am trying to upload a file from my local code repository to adls:

import requests
import json
def auth(tenant_id, client_id, client_secret):
    print('auth')
    auth_headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    auth_body = {
        "client_id": client_id,
        "client_secret": client_secret,
        "scope" : "https://storage.azure.com/.default",
        "grant_type" : "client_credentials"
    }
    resp = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", headers=auth_headers, data=auth_body)
    return (resp.status_code, json.loads(resp.text))

def mkfs(account_name, fs_name, access_token):
    print('mkfs')
    fs_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}?resource=filesystem", headers=fs_headers)
    return (resp.status_code, resp.text)

def mkdir(account_name, fs_name, dir_name, access_token):
    print('mkdir')
    dir_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}?resource=directory", headers=dir_headers)
    return (resp.status_code, resp.text)
    
def touch_file(account_name, fs_name, dir_name, file_name, access_token):
    print('touch_file')
    touch_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}/{file_name}?resource=file", headers=touch_file_headers)
    return (resp.status_code, resp.text)

def append_file(account_name, fs_name, path, content, position, access_token):
    print('append_file')
    append_file_headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "text/plain",
        "Content-Length": f"{len(content)}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=append&position={position}", headers=append_file_headers, data=content)
    return (resp.status_code, resp.text)
    
def flush_file(account_name, fs_name, path, position, access_token):
    print('flush_file')
    flush_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=flush&position={position}", headers=flush_file_headers)
    return (resp.status_code, resp.text)

def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token):
    print('mkfile')
    status_code, result = touch_file(account_name, fs_name, dir_name, file_name, access_token)
    if status_code == 201:
        with open(local_file_name, 'rb') as local_file:
            path = f"{dir_name}/{file_name}"
            content = local_file.read()
            position = 0
            append_file(account_name, fs_name, path, content, position, access_token)
            position = len(content)
            flush_file(account_name, fs_name, path, position, access_token)
    else:
        print(result)
        
    
if __name__ == '__main__':
    tenant_id = <tenant_id>
    client_id = <client_id>
    client_secret = <client_secret>
    
    account_name = 'adlsprod'
    fs_name = 'fs-adls'
    dir_name = 'FileStore'
    file_name = "abc.py"
    local_file_name = "__main__.py"
    
    # Acquire an Access token
    auth_status_code, auth_result = auth(tenant_id, client_id, client_secret)
    access_token = auth_status_code == 200 and auth_result['access_token'] or ''
    print(access_token)
    # Create a filesystem
    mkfs_status_code, mkfs_result = mkfs(account_name, fs_name, access_token)
    print(mkfs_status_code, mkfs_result)
    
    # Create a directory
    mkdir_status_code, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token)
    print(mkdir_status_code, mkdir_result)
    
    # Create a file from local file
    mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)

But I am getting the below error:

mkfs
403 {"error":{"code":"AuthorizationFailure","message":"This request is not authorized to perform this operation.\nRequestId:183befbd-f01f-00f7-562e-d8a0b8000000\nTime:2022-10-04T20:19:34.5637539Z"}}

I think the placement of fs_name and account_name is not correct in the url.

This is the location of a sample file in adls -

abfss://[email protected]/FileStore/abc.txt

I have picked the account_name and fs_name from this url. I dont understand where I am going wrong, or is there any other way of uploading the file to adls folder using python.


Solution

  • I'm sure you can do this with regular requests packages, but the Azure Python SDK makes a lot of this much easier: https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python

    In the example:

    def upload_file_to_directory():
        try:
            file_system_client = service_client.get_file_system_client(file_system="my-file-system")
            directory_client = file_system_client.get_directory_client("my-directory")  
            file_client = directory_client.create_file("uploaded-file.txt")
            local_file = open("C:\\file-to-upload.txt",'r')
            file_contents = local_file.read()
            file_client.append_data(data=file_contents, offset=0, length=len(file_contents))
            file_client.flush_data(len(file_contents))
        except Exception as e:
          print(e)
    

    You can chose between the account key, which looks like what you are doing, or Azure Active Directory (which may be easier on a local machine)