Search code examples
mapreducegridgain

GridGain: MapReduce with node-local data processing?


I am trying to perform some numerical computation on a large distributed data set. The algorithms fit the MapReduce model well with the additional property that output from the map step is small in size compared to the input data. Data can be considered read-only and is statically distributed over the nodes (except for re-balancing on fail-over). Note that this is somewhat contrary to the standard word-count examples where the input data is sent to the nodes performing the map step.

This implies that the map step shall be executed in parallel on all nodes, processing each node's local data, while it is acceptable that the output from the map step is sent to one node for the reduce step.

What is the best way to implement this with GridGain?

It seems there has been a reduce(..) method on GridCache/GridCacheProjection interfaces in earlier versions of GridGain, but this is not present any longer. Is there any replacement? I am thinking of a mechanism that takes a map closure and executes it distributed on each datum exactly once while avoiding to copy any input data across the network.

The (somewhat manual) approach I have come up with so far is the following:

public class GridBroadcastCountDemo {

    public static void main(String[] args) throws GridException {
        try (Grid grid = GridGain.start(CONFIG_FILE)) {

            GridFuture<Collection<Integer>> future = grid.forRemotes().compute().broadcast(new GridCallable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    GridCache<Integer, float[]> cache = grid.cache(CACHE_NAME);
                    int count = 0;
                    for (float[] array : cache.primaryValues()) {
                        count += array.length;
                    }
                    return count;
                }
            });

            int totalCount = 0;
            for (int count : future.get()) {
                totalCount += count;
            }
            // expect size of input data
            System.out.println(totalCount);
        }
    }
}

There is however no guarantee that each datum is processed exactly once with this approach. E.g. when re-balancing takes place while the GridCallables are executed, part of the data could be processed zero or multiple times.


Solution

  • GridGain Open Source (which is now Apache Ignite) has ComputeTask API which has both, map() and reduce() methods. If you are looking for a reduce() method, then ComputeTask is definitely the right API for you.

    For now your implementation is OK. Apache Ignite is adding a feature where a node will not be considered primary until the migration is fully finished. It should be coming soon.