Search code examples
javamultithreadinggraph

Concurrency Issue with Recursive Graph Traversal in Java - Duplicate Vertex Processing


I am facing a concurrency issue while recursively traversing a directed graph in Java using multiple threads. The problem arises when processing a specific vertex (v8 in my example) multiple times. Despite using a ConcurrentHashMap for the visited set, it seems that the threads are not synchronized correctly. The issue may be related to asynchronous execution and lack of synchronization, resulting in the duplication of vertex processing. I am seeking advice on how to ensure proper synchronization and avoid the recurrence of vertices in this multithreaded graph traversal scenario.

import org.jgrapht.Graph;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.DefaultEdge;


import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


public class VectorVisitedLogicTest2 {

    public static void main(String[] args) {
        VectorVisitedLogicTest2 test = new VectorVisitedLogicTest2();
        test.run();
    }

    private void run() {
        System.out.println("Starting process");
        Graph<String, DefaultEdge> directedGraph
                = new DefaultDirectedGraph<>(DefaultEdge.class);
        directedGraph.addVertex("v1");
        directedGraph.addVertex("v2");
        directedGraph.addVertex("v3");
        directedGraph.addVertex("v4");
        directedGraph.addVertex("v5");
        directedGraph.addVertex("v6");
        directedGraph.addVertex("v7");
        directedGraph.addVertex("v8");
        directedGraph.addVertex("v9");
        directedGraph.addVertex("v10");
        directedGraph.addVertex("v11");
        directedGraph.addVertex("v12");
        directedGraph.addVertex("v13");

        directedGraph.addEdge("v1", "v2");
        directedGraph.addEdge("v1", "v3");
        directedGraph.addEdge("v1", "v13");
        directedGraph.addEdge("v2", "v4");

        directedGraph.addEdge("v4", "v5");
        directedGraph.addEdge("v4", "v6");
        directedGraph.addEdge("v3", "v7");
        directedGraph.addEdge("v7", "v8");
        directedGraph.addEdge("v5", "v8");
        directedGraph.addEdge("v6", "v8");

        directedGraph.addEdge("v8", "v9");
        directedGraph.addEdge("v8", "v10");
        directedGraph.addEdge("v8", "v12");
        directedGraph.addEdge("v9", "v11");
        directedGraph.addEdge("v10", "v11");
        directedGraph.addEdge("v11", "v12");
        directedGraph.addEdge("v12", "v13");

        InheritableThreadLocal<Set<String>> visited = new InheritableThreadLocal();
        Thread thread = new Thread( () -> {
            try {
                visited.set(ConcurrentHashMap.newKeySet());
                execute(directedGraph, "v1",visited);
            } catch (InterruptedException e) {
                System.out.println(e);
                throw new RuntimeException(e);
            }
        }
        );
        thread.start();
    }

    public void execute(Graph<String, DefaultEdge> graph, String vertex, ThreadLocal<Set<String>> visited) throws InterruptedException {
        System.out.println(vertex + "-" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
        visited.get().add(vertex);
        for (DefaultEdge edge : graph.outgoingEdgesOf(vertex)) {
            String neighbor = graph.getEdgeTarget(edge);
            Set<String> neighbourParents = new HashSet<>();
            for (DefaultEdge parentEdge : graph.incomingEdgesOf(neighbor)) {
                neighbourParents.add(graph.getEdgeSource(parentEdge));
            }
            if(check(visited, neighbor, neighbourParents)) {
                Thread thread = new Thread(() -> {
                    try {
                        System.out.println("Calling : " + neighbor + "from : " + vertex );
                        execute(graph, neighbor, visited);
                    } catch (Exception e) {
                        System.out.println(e);
                        throw new RuntimeException(e);
                    }
                });
                thread.start();
            }
        }
    }
    private synchronized boolean check(ThreadLocal<Set<String>> visited, String neighbor, Set<String> neighbourParents) {
        return !visited.get().contains(neighbor) && visited.get().containsAll(neighbourParents);
    }

}

I tried introducing synchronised blocks but still, race condition is occurring.

Expected:

Starting process
v1-2024-01-31 19:01:01.413
Calling : v3from : v1
Calling : v2from : v1
v3-2024-01-31 19:01:01.551
v2-2024-01-31 19:01:01.550
Calling : v7from : v3
Calling : v4from : v2
v7-2024-01-31 19:01:01.553
v4-2024-01-31 19:01:01.553
Calling : v5from : v4
Calling : v6from : v4
v5-2024-01-31 19:01:01.554
v6-2024-01-31 19:01:01.554
Calling : v8from : v6
v8-2024-01-31 19:01:01.554
Calling : v9from : v8
v9-2024-01-31 19:01:01.555
Calling : v10from : v8
v10-2024-01-31 19:01:01.556
Calling : v11from : v10
v11-2024-01-31 19:01:01.557
Calling : v12from : v11
v12-2024-01-31 19:01:01.557
Calling : v13from : v12
v13-2024-01-31 19:01:01.558

Actual Response:

Starting process
v1-2024-01-31 18:54:00.859
Calling : v3from : v1
Calling : v2from : v1
v3-2024-01-31 18:54:01.035
v2-2024-01-31 18:54:01.035
Calling : v7from : v3
Calling : v4from : v2
v7-2024-01-31 18:54:01.038
v4-2024-01-31 18:54:01.038
Calling : v5from : v4
Calling : v6from : v4
v6-2024-01-31 18:54:01.040
v5-2024-01-31 18:54:01.040
Calling : v8from : v6
Calling : v8from : v5
v8-2024-01-31 18:54:01.043
v8-2024-01-31 18:54:01.044
Calling : v9from : v8
v9-2024-01-31 18:54:01.047
Calling : v9from : v8
v9-2024-01-31 18:54:01.054
Calling : v10from : v8
Calling : v10from : v8
v10-2024-01-31 18:54:01.060
v10-2024-01-31 18:54:01.060
Calling : v11from : v10
Calling : v11from : v10
v11-2024-01-31 18:54:01.063
v11-2024-01-31 18:54:01.063
Calling : v12from : v11
v12-2024-01-31 18:54:01.065
Calling : v12from : v11
Calling : v13from : v12
v12-2024-01-31 18:54:01.067
v13-2024-01-31 18:54:01.067
Calling : v13from : v12
v13-2024-01-31 18:54:01.068

'v8' has been called two times. 'v8' has to be called once from their child vertex.


Solution

  • Concurrent hash maps and concurrent hash sets do not provide thread safety by magic; the only thing they offer is that they will not crash and burn under multi-threaded use. However, you still have to use them properly under conditions of concurrency.

    user "Thomas" has already pointed out in a comment what you are doing wrong. The expression visited.get().contains(neighbor) may return false at the moment that it gets invoked, but it may very well return true a microsecond later, before the enclosing if() statement completes, or before the enclosing function returns.

    To fix this, take advantage of the fact that the Set.add() method returns a boolean to indicate whether an item was in fact added to the map, or whether the item was already present, and therefore not added. If Set.add() returns false, then abort the rest of the function.