Search code examples
pythonfunctional-programmingmapreduceschemelisp

Map-reduce functional outline


Note: this is more a basic programming question and nothing about the Hadoop or Map/Reduce methods of "big data processing".

Let's take a sequence (1 2 3 4 5):

To map it to some function, let's say square, I can do something like:

(define (map function sequence)
  ; apply the function to each element in the sequence
  ; we do not reduce it, but return a list
  (if (null? sequence)
      nil
      (cons (function (car sequence))
            (map function (cdr sequence)))))

(map (lambda (x) (* x x)) '(1 2 3 4 5))
; (1 4 9 16 25)
>>> map(lambda x: x*x, [1,2,3,4,5])
# [1, 4, 9, 16, 25]
>>> def mymap(function, sequence):
      return [function(item) for item in sequence]

>>> mymap(lambda x: x*x, [1,2,3,4,5])
# [1, 4, 9, 16, 25]

For something like a "map-reduce", it could have about three steps (I think?), if we suppose a given sequence:

  • map
  • filter (order may be swapped with map depending on what's being done)
  • reduce

Is that a correct understanding of the 'map-reduce' paradigm? Is it usually one function that looks like this:

mapreduce(map_function, filter_function, reduce_function, sequence)

Or how is it usually handled when being combined together?


Solution

  • To give you the intuition, we need to step away (briefly) from a concrete implementation in code. MapReduce (and I'm not just talking about a particular implementation) is about the shape of the problem.

    Say we have a linear data structure (list, array, whatever) of xs, and we have a transform function we want to apply to each of them, and we have an aggregation function that can be represented as repeated application of an associative pairwise combination:

        xA           xB
         |           |
      xform(xA)   ​xform(xB)
           ​\       /
    aggregator(xform(xA), xform(xB))
               ​|
             ​value
    

    And we can apply the aggregator recursively to the entire list/array/whatever of xs:

        xA           xB               xC
         |           |                |
      xform(xA)   ​xform(xB)         xform(xC)
         |           |                |
         yA          yB               yC
           ​\       /                  |
    aggregator(yA, yB)                |
               ​|                     /
             ​value                  /
               |                   /
              aggregator(value, yC)
                       |
                  next_value
    

    You asked for Python or Scheme, but I find this easier to think about if we use types. The transformer xform takes a single argument of type A and returns a B: (x: A) -> B. The aggregator aggregator takes two arguments of type B and also returns a B: (x: B, y: B) -> B.

    The simplest example of this, and one frequently over-used, is summing squares:

    import functools
    
    # Combiner
    def add(a, b):
        return a + b
    
    # Transformer
    def square(a):
        return a * a
    
    one_to_ten = range(1, 11)
    
    functools.reduce(add, map(square, one_to_ten), 0)
    

    Not very exciting. But what separates this from the more direct version any programmer in the world could write with a for loop that doesn't really show in the code (but does show in the diagram), is that the MapReduce version is completely parallelizable! You could easily chunk it out and run parts of it on different threads, different boxes, whatever. We have the transform, we have the combining function, and the associativity means the order of combination doesn't matter so we can split up the processing however is convenient and recurse on the processed sub-chunks, regardless of where/when they were processed.

    Now, not all problems can be broken down this way, but a surprising number are amenable to being modeled this way, and it allows working with data sets that are too large to process on one box. Now obviously, the naively written Python above can't do that, at least not today. But there's no reason a sufficiently smart compiler couldn't emit byte code to do it that way.

    While I don't know Scheme, I do know Clojure, which does indeed provide a parallelized version of this exact thing:

    (require '[clojure.core.reducers :as r])
    
    (defn square [x] (* x x))
    
    (r/fold + (pmap square (range 1 11)))
    

    Note this isn't quite perfect: the mapping while parallel must complete before the (also parallel) combining takes place, but we're getting closer and these are standard library calls.