Search code examples
javamultithreadingjava-8java-stream

Java distinct causing thread block


In short, a chain of distinct().limit(3).forEach() is causing my thread to get blocked which is causing unhealthy container. My service is running in ECS fargate.

@Builder(toBuilder = true)
@ToString
@Getter
@EqualsAndHashCode
public class TestPojo {
    private String pojoId;

    private String score;
    @EqualsAndHashCode.Exclude
    private boolean isRandomOrdered;
}


private List<TestPojo> randomizeOrder(final List<TestPojo> testObjects) {
    // Picking testObjects randomly only from top 25
    final int topLimit = Math.min(testObjects.size(), 25);
    final Set<TestPojo> newOrderedPojos = new LinkedHashSet<>();
    // First 2 position are fixed
    newOrderedPojos.add(testObjects.get(0)); // Validations are present in calling function to make sure that get(0) and get(1) don't throw out of bound error.
    newOrderedPojos.add(testObjects.get(1)); 
    new Random().ints(2, topLimit).distinct().limit(3)
         .forEach(i -> newOrderedPojos.add(testObjects.get(i).toBuilder()
                                               .isRandomOrdered(true)
                                               .build()));

    // Add remaining objects. Since this is a set only objects not already present will be inserted.
    newOrderedPojos.addAll(asins);

    return ImmutableList.copyOf(newOrderedPojos);
}

The above intermittently gets a thread block and causes the whole container to go unhealthy since it is blocking threads and thus health check fails.

11 Jul 2023 16:19:12,942 [33m[WARN][m   (abc-periodic-metrics-0) com.abc.xyz.bobcat.StuckThreadDetectingValve: Thread Bobcat-2 is detected as stuck as it has been blocked for 4442432 ms
com.abc.xyz.bobcat.StuckThreadDetectingValve$StuckThreadException: Check stacktrace to see where the thread is stuck
    at java.util.HashMap.hash(HashMap.java:340)
    at java.util.HashMap.containsKey(HashMap.java:597)
    at java.util.HashSet.contains(HashSet.java:204)
    at java.util.stream.DistinctOps$1$2.accept(DistinctOps.java:173)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.Random$RandomIntsSpliterator.tryAdvance(Random.java:1029)
    at java.util.stream.IntPipeline.forEachWithCancel(IntPipeline.java:162)
    at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
    at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateSequential(ForEachOps.java:188)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.IntPipeline.forEach(IntPipeline.java:427)
    at com.abc.service.xyz.randomizeAsinOrder(TestClass.java:123)

Does anyone know what could be the issue? I checked the javadoc for distinct and limit and foreach but non seem to indicate that they are thread un-safe or such a case could happen.


Solution

  • new Random().ints(0, 2).distinct().limit(3).forEach(System.out::println); // Can only generate 0 and 1.
    

    The above line of code blocks. However

    IntStream.of(0, 1).distinct().limit(3).forEach(System.out::println);
    

    Doesn't block.

    In the second example the size of stream is fixed thus limit knows that and doesn't block. However, in first example Random.ints() isn't fixed in length thus limit keeps on waiting and thus ends up blocking the thread.

    Basically the issue come from an infinite stream generated from Random.ints(). Distinct will pass along unique elements from that infinite stream and then limit will pick the top n. But since there is no third unique value, distinct keeps on waiting and limit is not stopping it. In second case since the stream limit is known if we didn't get a 3rd unique value by the end of stream jvm stops the processing.