Search code examples
javadistributed-computinggridgainignite

How to retry failed job on any node with Apache Ignite/GridGain


I'm experimenting with fault tolerance in Apache Ignite.

What I can't figure out is how to retry a failed job on any node. I have a use case where my jobs will be calling a third-party tool as a system process via process buildr to do some calculations. In some cases the tool may fail, but in most cases it's OK to retry the job on any node - including the one where it previously failed.

At the moment Ignite seems to reroute the job to another node which did not have this job before. So, after a while all nodes are gone and the task fails.

What I'm looking for is how to retry a job on any node.

Here's a test to demonstrate my problem.

Here's my randomly failing job:

public static class RandomlyFailingComputeJob implements ComputeJob {
    private static final long serialVersionUID = -8351095134107406874L;
    private final String data;

    public RandomlyFailingComputeJob(String data) {
        Validate.notNull(data);
        this.data = data;
    }

    public void cancel() {
    }

    public Object execute() throws IgniteException {
        final double random = Math.random();
        if (random > 0.5) {
            throw new IgniteException();
        } else {
            return StringUtils.reverse(data);
        }
    }
}

An below is the task:

public static class RandomlyFailingComputeTask extends
        ComputeTaskSplitAdapter<String, String> {
    private static final long serialVersionUID = 6756691331287458885L;

    @Override
    public ComputeJobResultPolicy result(ComputeJobResult res,
            List<ComputeJobResult> rcvd) throws IgniteException {
        if (res.getException() != null) {
            return ComputeJobResultPolicy.FAILOVER;
        }
        return ComputeJobResultPolicy.WAIT;
    }

    public String reduce(List<ComputeJobResult> results)
            throws IgniteException {
        final Collection<String> reducedResults = new ArrayList<String>(
                results.size());
        for (ComputeJobResult result : results) {
            reducedResults.add(result.<String> getData());
        }
        return StringUtils.join(reducedResults, ' ');
    }

    @Override
    protected Collection<? extends ComputeJob> split(int gridSize,
            String arg) throws IgniteException {
        final String[] args = StringUtils.split(arg, ' ');
        final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
                args.length);
        for (String data : args) {
            computeJobs.add(new RandomlyFailingComputeJob(data));
        }
        return computeJobs;
    }

}

Test code:

    final Ignite ignite = Ignition.start();
    final String original = "The quick brown fox jumps over the lazy dog";

    final String reversed = StringUtils.join(
            ignite.compute().execute(new RandomlyFailingComputeTask(),
                    original), ' ');

As you can see, should always be failovered. Since the probability of failure != 1, I expect the task to successfully terminate at some point.

With the probability threshold of 0.5 and a total of 3 nodes this hardly happens. I'm getting an exception like class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null). After some debugging I've found out that this is because I eventually run out of nodes. All of the are gone.

I understand that I can write my own FailoverSpi to handle this.

But this just doesn't feel right.

First, it seems to be an overkill to do this.
But then the SPI is a kind of global thing. I'd like to decide per job if it should be retried or failed over. This may, for instance, depend on what the exit code of the third-party tool I'm invoking. So configuring failover over the global SPI isn't right.


Solution

  • Current implementation of AlwaysFailoverSpi (which is the default one) doesn't failover if it has already tried all nodes for a particular job. I believe it can be a configuration option, but for now you will have to implement your own failover SPI (it should be pretty simple - just pick a random node from the topology each time a job is trying to fail over).

    As for global nature of the SPI, you're right, but its failover() takes FailoverContext, which has information about failed job (task name, attributes, exception, etc.), so you can make decision based on this information.