Search code examples
pythonhdfs

Python : How to import list of files in directory from HDFS


I try to import list of files from HDFS in python.

How to do this from HDFS :

path =r'/my_path'
allFiles = glob.glob(path + "/*.csv")

df_list = []
for file_ in allFiles:
    df = pd.read_csv(file_,index_col=None, header=0,sep=';')    
    df_list.append(df)

I think subprocess.Popen do the trick but how to extract only the filename ?

import subprocess
p = subprocess.Popen("hdfs dfs -ls /my_path/ ",
    shell=True,
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT)


for line in p.stdout.readlines():
    print(line)

The output is like this :

b'Found 32 items\n'
b'-rw-------   3 user hdfs   42202621 2019-01-21 10:05 /my_path/file1.csv\n'
b'-rw-------   3 user hdfs   99320020 2019-01-21 10:05 /my_path/file2.csv\n'

Solution

  • Declaimer: This will be a long and tedious. But given the circumstance, I'll try to make it as general and reproducible as possible.


    Given the requirement of no external libraries (except for pandas?), there isn't must of a choice to take. I suggest utilizing WebHDFS as much as possible.

    AFAIK, installation of HDFS, by default, includes an installation of WebHDFS. Following solution heavily relies on WebHDFS.

    First Step

    To begin with, you must be aware of WebHDFS urls. WebHDFS is installed on HDFS Namenode(s), and default port is 50070.

    Therefore, we start with http://[namenode_ip]:50070/webhdfs/v1/, where /webhdfs/v1/ is a common url for all.

    For the sake of example, let's assume it as http://192.168.10.1:50070/web/hdfs/v1.

    Second Step

    Ordinarily, one can use curl to list contents of a HDFS directory. For detailed explanation, refer to WebHDFS REST API: List a Directory

    If you were to use curl, following provides FileStatuses of all the files inside a given directory.

    curl "http://192.168.10.1:50070/webhdfs/v1/<PATH>?op=LISTSTATUS"
                 ^^^^^^^^^^^^ ^^^^^             ^^^^  ^^^^^^^^^^^^^
                 Namenode IP  Port              Path  Operation
    

    As mentioned, this returns FileStatuses in JSON object:

    {
      "FileStatuses":
      {
        "FileStatus":
        [
          {
            "accessTime"      : 1320171722771,
            "blockSize"       : 33554432,
            "group"           : "supergroup",
            "length"          : 24930,
            "modificationTime": 1320171722771,
            "owner"           : "webuser",
            "pathSuffix"      : "a.patch",
            "permission"      : "644",
            "replication"     : 1,
            "type"            : "FILE"
          },
          {
            "accessTime"      : 0,
            "blockSize"       : 0,
            "group"           : "supergroup",
            "length"          : 0,
            "modificationTime": 1320895981256,
            "owner"           : "szetszwo",
            "pathSuffix"      : "bar",
            "permission"      : "711",
            "replication"     : 0,
            "type"            : "DIRECTORY"
          },
          ...
        ]
      }
    }
    

    Same result can be achieved using python's default libs:

    import requests
    
    my_path = '/my_path/'
    curl = requests.get('http://192.168.10.1:50070/webhdfs/v1/%s?op=LISTSTATUS' % my_path)
    

    And as shown above, the actual status of each file is two levels below of the result JSON. In other words, to get the FileStatus of each file:

    curl.json()['FileStatuses']['FileStatus'] 
    
    [
      {
        "accessTime"      : 1320171722771,
        "blockSize"       : 33554432,
        "group"           : "supergroup",
        "length"          : 24930,
        "modificationTime": 1320171722771,
        "owner"           : "webuser",
        "pathSuffix"      : "a.patch",
        "permission"      : "644",
        "replication"     : 1,
        "type"            : "FILE"
      },
      {
        "accessTime"      : 0,
        "blockSize"       : 0,
        "group"           : "supergroup",
        "length"          : 0,
        "modificationTime": 1320895981256,
        "owner"           : "szetszwo",
        "pathSuffix"      : "bar",
        "permission"      : "711",
        "replication"     : 0,
        "type"            : "DIRECTORY"
      },
      ...
    ]
    

    Third Step

    Since you now have all the information you want, all you need to do is parsing.

    import os
    
    file_paths = []
    for file_status in curl.json()['FileStatuses']['FileStatus']:
        file_name = file_status['pathSuffix']
        # this is the file name in the queried directory
        if file_name.endswith('.csv'):
        # if statement is only required if the directory contains unwanted files (i.e. non-csvs).
            file_paths.append(os.path.join(path, file_name))
            # os.path.join asserts your result consists of absolute path
    
    file_paths
    ['/my_path/file1.csv',
     '/my_path/file2.csv',
     ...]
    

    Final Step

    Now you know the paths of files and WebHDFS links, pandas.read_csv can handle rest of the works.

    import pandas as pd
    
    dfs = []
    web_url = "http://192.168.10.1:50070/webhdfs/v1/%s?op=OPEN"
    #                                                  ^^^^^^^
    #                                    Operation is now OPEN
    for file_path in file_paths:
        file_url = web_url % file_path
        # http://192.168.10.1:50070/webhdfs/v1/my_path/file1.csv?op=OPEN
        dfs.append(pd.read_csv(file_url))
    

    And there you go with all the .csvs imported and assigned to dfs.

    Warnings

    If your HDFS is configured for HA (High Availability), there will be multiple namenodes and thus your namenode_ip must be set accordingly: It must be the IP of an active node.