Here is my code:
class Processor implements Runnable {
private int id;
private Integer interaction;
private Set<Integer> subset;
public volatile static AtomicBoolean notRemoved = new AtomicBoolean(true);
public Object<E> dcp;
public Iterator<Integer> iterator;
public Processor(int id, Integer interaction, Set<Integer> subset, Object<E> dcp, Iterator<Integer> iterator) {
this.id = id;
this.interaction = interaction;
this.subset= subset;
this.dcp = dcp;
this.iterator = iterator;
}
public void run() {
while (Processor.notRemoved.get()){
System.out.println("Starting: " + this.id);
if (this.dcp.PA.contains(this.interaction)){
this.subset.add(this.interaction);
this.dcp.increaseScore(this.subset);
if (!this.subset.contains(this.interaction) && Processor.notRemoved.get()){
Processor.notRemoved.set(false);
System.out.println(iterator);
iterator.remove();
}
}
System.out.println("Completed: " + this.id);
}
}
}
public class ConcurrentApp {
public void multiFunction(Object<E> dcp, int threads) {
ExecutorService executor = Executors.newFixedThreadPool(threads);
int i =1;
while ((dcp.PA.size() > i) && (i <= dcp.R)){
for (Iterator<Integer> iterator = dcp.PA.iterator(); iterator.hasNext();){
Integer interaction = iterator.next();
ArrayList<Integer> removed = new ArrayList<Integer>(dcp.PA);
removed.remove(interaction);
ArrayList<Set<Integer>> subsets = dcp.getSubsets(removed, i);
for (int j = 0; j< subsets.size(); j++){
executor.submit(new Processor(j, interaction, subsets.get(j), dcp, iterator));
}
Processor.notRemoved.set(true); //Pretty sure this is necessary
}
i++;
}
executor.shutdown();
System.out.println("All tasks submitted");
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("All tasks completed");
}
}
As I expected, I am getting java.util.ConcurrentModificationException since multiple threads are trying to simultaneously access my iterator object. I saw a lot of alternatives to using an ArrayList (which is what the field dcp.PA is). I tried using CopyOnWriteArrayList but for some reason I keep getting more problems when I introduce this thread safe implementation of ArrayList. I was wondering if anyone can point me in the right direction as to what thread-safe data structure I should use and how I should use them within my code or other solutions to getting around the concurrent modification error.
Maybe it's time to stop asking, "How do I use this particular technique?" and start asking, "How do I accomplish my goal?" Your technique of using an iterator passed in as a parameter is very brittle. And not needed, really.
Resist the temptation to pass iterators around as parameters, let alone to multiple threads. That's a big no-no. You are almost explicitly asking for ConcurrentModificationException
. And over what collection does the iterator iterate? There's nothing in your Processor
class to enforce coherency. Oh, I know you have the iterator connected to the collection in ConcurrentApp
, but the Processor
method is public and thus Processor
cannot trust in such a connection.
And notRemoved
is confusing. What does "removed" even mean? I am also deeply suspicious of its being static
when nothing else in your algorithm is.
So the beginning part of your answer is to clean up all the antipatterns in your code. Keep iterators in the same scope as the variables that generate them. Follow the naming conventions. Show all relevant information here if you expect help.
Once you've refactored your code and gotten all the weirdness out, it becomes a lot easier to make removals thread-safe.
void processRemoval(Collection<E> collection) {
synchronized(lock) {
for (Iterator<E> iter = collection.iterator(); iter.hasNext(); ) {
final E element = iter.next();
if (shouldRemove(element) {
iter.remove();
}
}
}
}
HTH