I made collector who can reduce a stream to a map which has the keys as the items that can be bought by certain customers and the names of customers as values, my implementation is working proberly in sequential stream
but when i try to use parallel
it's not working at all, the resulting sets always contain one customer name.
List<Customer> customerList = this.mall.getCustomerList();
Supplier<Object> supplier = ConcurrentHashMap<String,Set<String>>::new;
BiConsumer<Object, Customer> accumulator = ((o, customer) -> customer.getWantToBuy().stream().map(Item::getName).forEach(
item -> ((ConcurrentHashMap<String,Set<String>>)o)
.merge(item,new HashSet<String>(Collections.singleton(customer.getName())),
(s,s2) -> {
HashSet<String> res = new HashSet<>(s);
res.addAll(s2);
return res;
})
));
BinaryOperator<Object> combiner = (o,o2) -> {
ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>((ConcurrentHashMap<String,Set<String>>)o);
res.putAll((ConcurrentHashMap<String,Set<String>>)o2);
return res;
};
Function<Object, Map<String, Set<String>>> finisher = (o) -> new HashMap<>((ConcurrentHashMap<String,Set<String>>)o);
Collector<Customer, ?, Map<String, Set<String>>> toItemAsKey =
new CollectorImpl<>(supplier, accumulator, combiner, finisher, EnumSet.of(
Collector.Characteristics.CONCURRENT,
Collector.Characteristics.IDENTITY_FINISH));
Map<String, Set<String>> itemMap = customerList.stream().parallel().collect(toItemAsKey);
There is certainly a problem in my accumulator
implementation or another Function
but I cannot figure it out! could anyone suggest what should i do ?
Your combiner is not correctly implemented.
You overwrite all entries that has the same key. What you want is adding values to existing keys.
BinaryOperator<ConcurrentHashMap<String,Set<String>>> combiner = (o,o2) -> {
ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>(o);
o2.forEach((key, set) -> set.forEach(string -> res.computeIfAbsent(key, k -> new HashSet<>())
.add(string)));
return res;
};