Search code examples
pythonmapreduceelastic-map-reduce

How can I write a MapReduce code in Python to implement a matrix transpose.


Assume the input file is a .txt and I am trying to run it on a cluster(like EMR on AWS) to test.


Solution

  • The problem with what you want is that you demand that the order of the lines will be kept, meaning

    if you have:
    a,b,c
    d,e,f
    g,h,i

    you want the output to be
    a,d,g
    b,e,h
    c,f,i

    But the MapReduce does not work like this. The MR will get the file and split the file to different map reduce task, I could have given you a code that will produce something like the transpose, meaning

    it can be:
    a,g,d
    g,e,h
    i,c,f

    because the line will be send to the same map, but the order of the lines is not kept.

    unless you can do some preprocessing on the file and add also line number as a parameter to each line.

    if you could do that, the code is very simple:

    lets say this is your file (the number on the left is line index)

    1. a,b,c,d
    2. e,f,g,h
    3. i,j,k,l

    then, when its flow in the emr cluster, the emr breaks the file to lines. lets say the line index is first like this:
    1,a,b,c
    2,d,e,f
    3,g,h,i

    def map():
        for line in sys.stdin:
            splitted_line = line.strip().split()
            line_index, values = splitted_line[0], splitted_line[1:]
    
            for column_index, value in enumerate(values):
                # emits a record were the key in the column index, 
                # and the value is the line number and the value.
                # the '\t' is the delimiter the emr use by default to 
                # identify the separator between the key and value
                # the line looks like this:
                # "<column_index>|\t|<line_index>|<value>"
                print '|'.join(column_index,'\t',line_index,value)
    
    def reduce():
        old_column_index = None
        line_to_emit = list()
        for line in sys.stdin:
            values = line.split("|")
            column_index, line_index, value = values[0], values[2], values[3]
    
            if not old_column_index:
                old_column_index = column_index
            if column_index == old_column_index:
                line_to_emit.append((line_index,value))
            else:
                old_column_index = column_index
                sorted_line = sorted(line_to_emit, key=lambda value:value[0])
                only_values_line = [value[1] for value in sorted_line]
                print ','.join(only_values_line)
    

    But still even after all this, the lines in the output will not be in the order you need. and you will have to sort them yourself, possible way, pass the index of the new line yourself.