Search code examples
pythonhadoopsshparamikopscp

How to run PSCP cmd window step in my Python script


I am running Hadoop MapReduce and other SSH commands from a Python script using the paramiko module (code can be seen here). Once the MapReduce job is complete, I run the getmerge step to get the output into a text file.

The problem is, I then have to open a cmd window and run PSCP to download the output.txt file from the HDFS environment to my computer. For example:

pscp xxxx@xx.xx.xx.xx:/nfs_home/appers/cnielsen/MROutput_121815_0.txt C:\Users\cnielsen\Desktop\MR_Test

How can I incorporate this pscp step into my script so that I don't have to open a cmd window to run this after my MapReduce and getmerge tasks are complete? I would like my script to be able to run the MR task, getmerge task, and then automatically save the MR output to my computer.

Here is my code.


Solution

  • I have solved this problem with the following code. The trick was to use the scp module and import SCPClient. See the scp_download(ssh) function below.

    When the MapReduce job completes the getmerge command is run, followed by the scp_download step.

    import paramiko
    from scp import SCPClient
    import time
    
    # Define connection info
    host_ip = 'xx.xx.xx.xx'
    user = 'xxxxxxxx'
    pw = 'xxxxxxxx'
    port = 22
    
    # Paths
    input_loc = '/nfs_home/appers/extracts/*/*.xml'
    output_loc = '/user/lcmsprod/output/cnielsen/'
    python_path = "/usr/lib/python_2.7.3/bin/python"
    hdfs_home = '/nfs_home/appers/cnielsen/'
    output_log = r'C:\Users\cnielsen\Desktop\MR_Test\MRtest011316_0.txt'
    
    # File names
    xml_lookup_file = 'product_lookups.xml'
    mapper = 'Mapper.py'
    reducer = 'Reducer.py'
    helper_script = 'Process.py'
    product_name = 'test1'
    output_ref = 'test65'
    target_file = 'test_011416_3.txt'
    
    # ----------------------------------------------------
    def createSSHClient(host_ip, port, user, pw):
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(host_ip, port, user, pw)
        return client
    # ----------------------------------------------------
    def buildMRcommand(product_name):
        space = " "
        mr_command_list = [ 'hadoop', 'jar', '/share/hadoop/tools/lib/hadoop-streaming.jar',
                            '-files', hdfs_home+xml_lookup_file,
                            '-file', hdfs_home+mapper,
                            '-file', hdfs_home+reducer,
                            '-mapper', "'"+python_path, mapper, product_name+"'",
                            '-file', hdfs_home+helper_script,
                            '-reducer', "'"+python_path, reducer+"'",
                            '-input', input_loc,
                            '-output', output_loc+output_ref]
    
        MR_command = space.join(mr_command_list)
        print MR_command
        return MR_command
    # ----------------------------------------------------
    def unbuffered_lines(f):
        line_buf = ""
        while not f.channel.exit_status_ready():
            line_buf += f.read(1)
            if line_buf.endswith('\n'):
                yield line_buf
                line_buf = ""
    # ----------------------------------------------------
    def stream_output(stdin, stdout, stderr):
        writer = open(output_log, 'w')
        # Using line_buffer function
        for l in unbuffered_lines(stderr):
            e = '[stderr] ' + l
            print '[stderr] ' + l.strip('\n')
            writer.write(e)
    
        # gives full listing..
        for line in stdout:
            r = '[stdout]' + line
            print '[stdout]' + line.strip('\n')
            writer.write(r)
        writer.close()
    # ----------------------------------------------------
    def run_MapReduce(ssh):
        stdin, stdout, stderr = ssh.exec_command(buildMRcommand(product_name))
        stream_output(stdin, stdout, stderr)
        return 1
    # ----------------------------------------------------
    def run_list_dir(ssh):
        list_dir = "ls "+hdfs_home+" -l"
        stdin, stdout, stderr = ssh.exec_command(list_dir)
        stream_output(stdin, stdout, stderr)
    # ----------------------------------------------------
    def run_getmerge(ssh):
        getmerge = "hadoop fs -getmerge "+output_loc+output_ref+" "+hdfs_home+target_file
        print getmerge
        stdin, stdout, stderr = ssh.exec_command(getmerge)
        for line in stdout:
            print '[stdout]' + line.strip('\n')
        time.sleep(1.5)
        return 1
    # ----------------------------------------------------
    def scp_download(ssh):
        scp = SCPClient(ssh.get_transport())
        print "Fetching SCP data.."
        scp.get(hdfs_home+target_file, local_dir)
        print "File download complete."
    # ----------------------------------------------------
    def main():
        # Get the ssh connection
        global ssh
        ssh = createSSHClient(host_ip, port, user, pw)
        print "Executing command..."
    
        # Command list
        ##run_list_dir(ssh)
        ##run_getmerge(ssh)
        ##scp_download(ssh)
    
        # Run MapReduce
        MR_status = 0
        MR_status = run_MapReduce(ssh)
    
        if MR_status == 1:
            gs = 0
            gs = run_getmerge(ssh)
            if gs == 1:
                scp_download(ssh)
    
        # Close ssh connection
        ssh.close()
    
    if __name__ == '__main__':
        main()