How to do MapReduce on Riak with Erlang to get even values from all the keys storing the numbers from 1 to 1000

I am trying to do mapreduce on Riak with Erlang. I am having data like the following:

Bucket = "Numbers"
{Keys,values} = {Random key,1},{Random key,2}........{Random key,1000}. 

Now, I am storing 1000 values from 1 to 1000, where all the keys are autogenerated by the term undefined given as a parameter, so all the keys will have values starting from 1 to 1000.

So I want the data from only the values that are even numbers. Using mapreduce, how can I achieve this?


  • You would construct phase functions as described in

    One possible map function:

    Mapfun = fun(Object, _KeyData, _Arg) ->
        %% get the object value, convert to integer and check if even
        Value = list_to_integer(binary_to_term(riak_object:get_value(Object))),
        case Value rem 2 of
          0 -> [Value];
          1 -> []

    Although you probably want to not completely fail in the event you encounter a sibling:

    Mapfun = fun(Object, _KeyData, _Arg) ->
        Values = riak_object:get_values(Object),
        case length(Values) of        %% checking for siblings
          1 ->                   %% only 1 value == no siblings
             I = list_to_integer(binary_to_term(hd(Values))),
             case I rem 2 of
                0 -> [I];    %% value is even
                1 -> []          %% value is odd
          _ -> []                %% What should happen with siblings?

    There may also be other cases you need to either prevent or check for: the value containing non-numeric characters, empty value, deleted values(tombsones), just to name a few.

    A word of caution: Doing a full-bucket MapReduce job will require Riak to read every value from the disk, this could cause extreme latency and timeout on a sizeable data set. Probably not something you want to do in production.

    A full example of peforming MapReduce (limited to the numbers 1 to 200 for space considerations):

    Assuming that you have cloned and built the riak-erlang-client
    Using the second Mapfun from above

    erl -pa {path-to-riak-erlang-client}/ebin

    Define a reduce function to sort the list

    Reducefun = fun(List,_) -> 

    Attach to the local Riak server

    {ok, Pid} = riakc_pb_socket:start_link("", 8087).

    Generate some test data

    [ riakc_pb_socket:put(
                                list_to_binary("Key" ++ V),V
                      ) || V <- [ integer_to_list(Itr) || Itr <- lists:seq(1,200)]],

    The function to perform a MapReduce with this client is
    mapred(pid(), mapred_inputs(), [mapred_queryterm()])

    The mapred_queryterm is a list of phase specification of the form {Type, FunTerm, Arg, Keep} as defined in the readme. For this example, there are 2 phases:

    • a map phase that selects only even numbers
      {map, Mapfun, none, true}
    • a reduce phase that sorts the result
      {reduce, Reducefun, none, true}

    Perform the MapReduce query

    {ok,Results} = riakc_pb_socket:mapred(
                                  Pid,  %% The socket pid from above
                                  <<"numbers">>,  %% Input is the bucket 

    Results will be a list of [{_Phase Index_, _Phase Output_}] with a separate entry for each phase for which Keep was true, in this example both phases are marked keep, so in this example Results will be
    [{0,[_map phase result_]},{1,[_reduce phase result_]}]

    Print out the result of each phase:

    [ io:format("MapReduce Result of phase ~p:~n~P~n",[P,Result,500]) 
                    || {P,Result} <- Results ].

    When I ran this, my output was:

    MapReduce Result of phase 0:
    MapReduce Result of phase 1: