Search code examples
javasortinghashmapparallelstream

Trying to use Map.values().parallelStream().forEach(list -> list.sort(comparator)) but get error: "Comparison method violates its general contract!"


I am trying to use multi-threading to sort arrays that are stored in a map. There are a large amount of records, ~3.1 million, and thus while I am attempting to sort these records in a single threaded for loop it takes many hours to complete. I'm hoping to get this time down as much as possible, ideally within a few minutes (please don't laugh!).

Stacktrace:

    Exception in thread "main" java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
    at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:562)
    at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:591)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:689)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:765)
    at com.salesforce.process.Process.startProcess(Process.java:51)
    at com.salesforce.process.Schedule.main(Schedule.java:10)
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.base/java.util.TimSort.mergeLo(TimSort.java:781)
    at java.base/java.util.TimSort.mergeAt(TimSort.java:518)
    at java.base/java.util.TimSort.mergeCollapse(TimSort.java:448)
    at java.base/java.util.TimSort.sort(TimSort.java:245)
    at java.base/java.util.Arrays.sort(Arrays.java:1307)
    at java.base/java.util.ArrayList.sort(ArrayList.java:1721)
    at com.salesforce.process.Process.lambda$startProcess$0(Process.java:51)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1779)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

Class Object:

public class MyObject {
private Integer id;
public String someString;
public Double sortableValue;

... contructors & getters and setters ...
public static Comparator<MyObject> SortableValueComparator = new Comparator<MyObject>() {

    public int compare(MyObject ds1, MyObject ds2) {
       Double sortableValue1 = ds1.getSortableValue();
       Double sortableValue2 = ds2.getSortableValue();
       //descending order      
       if (Double.compare(sortableValue1, sortableValue2) == 0) {
            return 0;
        }
        else if (Double.compare(sortableValue1, sortableValue2) < 0) {
            return -1;
        }
        else {
            return 1;
        }
    }
};

The Code:

And I'm trying to execute this in the code like this:

Map<String,List<MyObject>> map = new HashMap<String,List<MyObject>>();
// inject 3.1 million keys with List<MyObject> values, with 1-10 items in each list.

map.values().parallelStream().forEach(list -> list.sort(MyObject.SortableValueComparator));

NOTE: this is not what I want to do, but I initially wrote the code like this and it worked. That is to say, my comparator works if I do it this way.

for (List<MyObject> list : map.values()) {
            Collections.sort(list, MyObject.SortableValueComparator);    
        }

however, it takes for.ev.er to complete, which sadly is not acceptable for our business case. What can this noob do to make this parallelStream() or some way of threading this work?? If you need more info, please let me know! Thanks so much!!

Edit: I want to also give you guys a sample of the data below. So this is a Map<String,List<MyObject>>.

key (String): "key1", values (List<MyObject>): [{"a",0.0112},{"b",0.12},{"c",0.00512}]
key: "key2", values: [{"d",0.0922},{"a",0.0112},{"f",0.23}]
key: "key3", values: [{"z",0.141},{"w",0.432},{"x",0.0001}]

so, If I wanted to sort key3 list of objects, they would return like this:

key: "key3", values: [{"w",0.432},{"z",0.141},,{"x",0.0001}]

and, I want to do this sort function on every record.


Solution

  • Instead of using

    Map.values().parallelStream().forEach(list -> list.sort(comparator))
    

    I used

    Map.values().Stream().forEach(list -> list.sort(comparator))
    

    and it worked!