I have a list of TxnType
values with different complexities (process durations).
I want to find matched TxnType
from the list.
I tried to implement it by mixing parallel processing and short-circuit filter features of the stream, but I noticed there is not a mixture of them.
I wrote the below sample. But noticed a mix of parallel and short-circuit not work properly.
Every run shows parallel processing working but not terminating when found as soon as found item!!!
class TxnType {
public String id;
public TxnType(String id) {this.id = id;}
public boolean match(String entry) {
Date s = new Date();
// simulate long processing match time TxnType
if (id.equals("1")) {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Date f = new Date();
System.out.println("check id = " + id+ " duration = "+(f.getTime()- s.getTime()));
return id.equalsIgnoreCase(entry);
}
}
private void test4() {
// build list of available TxnTypes
ArrayList<TxnType> lst = new ArrayList<>();
lst.add(new TxnType("0"));
lst.add(new TxnType("1")); // long match processing time type
lst.add(new TxnType("2"));
lst.add(new TxnType("3"));
lst.add(new TxnType("4"));
String searchFor = "3";
System.out.println("searchFor = " + searchFor);
Date st, fi;
st = new Date();
Optional<TxnType> found2 =lst.stream().parallel().filter(txnType->txnType.match(searchFor)).findFirst();
System.out.println("found.stream().count() = " + found2.stream().count());
fi= new Date();
System.out.println("dur="+ (fi.getTime()- st.getTime()));
}
By running multiple times, I found that the processing was not terminated as soon as possible and wait to process all of them!!!!
searchFor = 3
check id = 4 duration = 0
check id = 2 duration = 0
check id = 3 duration = 0
check id = 0 duration = 0
check id = 1 duration = 4005
found.stream().count() = 1
dur=4050
Is there something like filterFindFirst()
?
Your mistake is using findFirst
, rather than findAny
.
Note that 1
is ordered before the element that you expect to be found (3
). So it has to finish checking 1
first, before it can conclude that "3
is first element that matches the predicate", even if they are done in parallel. If it found 3
, and haven't started checking something further down the list yet, then it won't start that. This is what short-circuiting in findFirst
means.
findAny
on the other hand, doesn't care about the order. If it finds any element that satisfies the predicate, it does not start to check anything new anymore.
Now, even if you changed to findAny
, you may still find that it takes 4 seconds to complete. This is because there are too few elements in the list, compared to how many threads the stream pipeline can create. So the processing for all the elements start, and once it has started, it will not be interrupted, even if it has already found an element that satisfies the predicate.
If you put more elements into the list:
for (int i = 0 ; i < 100 ; i++) {
lst.add(new TxnType("foo"));
}
...
Optional<TxnType> found2 = lst.parallelStream().filter(txnType -> txnType.match(searchFor)).findAny();
Then the processing of 1
is less likely to be started before the processing of 3
finishes, and you will get a much quicker run. This will not happen every time though. There's no guarantee that 1
won't get processed before 3
.
Basically, the short-circuiting is working correctly. It's just that
findFirst
won't short-circuit as aggressively as you'd like