I am trying to do mapreduce on Riak with Erlang. I am having data like the following:
Bucket = "Numbers"
{Keys,values} = {Random key,1},{Random key,2}........{Random key,1000}.
Now, I am storing 1000 values from 1 to 1000, where all the keys are autogenerated by the term undefined given as a parameter, so all the keys will have values starting from 1 to 1000.
So I want the data from only the values that are even numbers. Using mapreduce, how can I achieve this?
You would construct phase functions as described in http://docs.basho.com/riak/latest/dev/advanced/mapreduce/
One possible map function:
Mapfun = fun(Object, _KeyData, _Arg) ->
%% get the object value, convert to integer and check if even
Value = list_to_integer(binary_to_term(riak_object:get_value(Object))),
case Value rem 2 of
0 -> [Value];
1 -> []
end
end.
Although you probably want to not completely fail in the event you encounter a sibling:
Mapfun = fun(Object, _KeyData, _Arg) ->
Values = riak_object:get_values(Object),
case length(Values) of %% checking for siblings
1 -> %% only 1 value == no siblings
I = list_to_integer(binary_to_term(hd(Values))),
case I rem 2 of
0 -> [I]; %% value is even
1 -> [] %% value is odd
end;
_ -> [] %% What should happen with siblings?
end
end.
There may also be other cases you need to either prevent or check for: the value containing non-numeric characters, empty value, deleted values(tombsones), just to name a few.
Edit:
A word of caution: Doing a full-bucket MapReduce job will require Riak to read every value from the disk, this could cause extreme latency and timeout on a sizeable data set. Probably not something you want to do in production.
Assuming that you have cloned and built the riak-erlang-client
Using the second Mapfun from above
erl -pa {path-to-riak-erlang-client}/ebin
Define a reduce function to sort the list
Reducefun = fun(List,_) ->
lists:sort(List)
end.
Attach to the local Riak server
{ok, Pid} = riakc_pb_socket:start_link("127.0.0.1", 8087).
Generate some test data
[ riakc_pb_socket:put(
Pid,
riakc_obj:new(
<<"numbers">>,
list_to_binary("Key" ++ V),V
)
) || V <- [ integer_to_list(Itr) || Itr <- lists:seq(1,200)]],
The function to perform a MapReduce with this client is
mapred(pid(), mapred_inputs(), [mapred_queryterm()])
The mapred_queryterm
is a list of phase specification of the form {Type, FunTerm, Arg, Keep}
as defined in the readme. For this example, there are 2 phases:
{map, Mapfun, none, true}
{reduce, Reducefun, none, true}
Perform the MapReduce query
{ok,Results} = riakc_pb_socket:mapred(
Pid, %% The socket pid from above
<<"numbers">>, %% Input is the bucket
[{map,{qfun,Mapfun},none,true},
{reduce,{qfun,Reducefun},none,true}]
),
Results
will be a list of [{_Phase Index_, _Phase Output_}]
with a separate entry for each phase for which Keep
was true, in this example both phases are marked keep, so in this example Results
will be
[{0,[_map phase result_]},{1,[_reduce phase result_]}]
Print out the result of each phase:
[ io:format("MapReduce Result of phase ~p:~n~P~n",[P,Result,500])
|| {P,Result} <- Results ].
When I ran this, my output was:
MapReduce Result of phase 0:
[182,132,174,128,8,146,18,168,70,98,186,118,50,28,22,112,82,160,114,106,12,26,
124,14,194,64,122,144,172,96,126,162,58,170,108,44,90,104,6,196,40,154,94,
120,76,48,150,52,4,62,140,178,2,142,100,166,192,66,16,36,38,88,102,68,34,32,
30,164,110,42,92,138,86,54,152,116,156,72,134,200,148,46,10,176,198,84,56,78,
130,136,74,190,158,24,184,180,80,60,20,188]
MapReduce Result of phase 1:
[2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38,40,42,44,46,48,50,52,54,
56,58,60,62,64,66,68,70,72,74,76,78,80,82,84,86,88,90,92,94,96,98,100,102,
104,106,108,110,112,114,116,118,120,122,124,126,128,130,132,134,136,138,140,
142,144,146,148,150,152,154,156,158,160,162,164,166,168,170,172,174,176,178,
180,182,184,186,188,190,192,194,196,198,200]
[ok,ok]