Search code examples
javajava-8java-streamcollectors

Accumulator not working properly in parallel stream


I made collector who can reduce a stream to a map which has the keys as the items that can be bought by certain customers and the names of customers as values, my implementation is working proberly in sequential stream but when i try to use parallel it's not working at all, the resulting sets always contain one customer name.

List<Customer> customerList = this.mall.getCustomerList();

Supplier<Object> supplier = ConcurrentHashMap<String,Set<String>>::new;

BiConsumer<Object, Customer> accumulator = ((o, customer) -> customer.getWantToBuy().stream().map(Item::getName).forEach(
            item -> ((ConcurrentHashMap<String,Set<String>>)o)
                    .merge(item,new HashSet<String>(Collections.singleton(customer.getName())),
                            (s,s2) -> {
                                HashSet<String> res = new HashSet<>(s);
                                res.addAll(s2);
                                return res;
                            })
    ));

BinaryOperator<Object> combiner = (o,o2) -> {
        ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>((ConcurrentHashMap<String,Set<String>>)o);
        res.putAll((ConcurrentHashMap<String,Set<String>>)o2);
        return res;
    };

Function<Object, Map<String, Set<String>>> finisher = (o) -> new HashMap<>((ConcurrentHashMap<String,Set<String>>)o);

Collector<Customer, ?, Map<String, Set<String>>> toItemAsKey =
        new CollectorImpl<>(supplier, accumulator, combiner, finisher, EnumSet.of(
            Collector.Characteristics.CONCURRENT,
            Collector.Characteristics.IDENTITY_FINISH));

Map<String, Set<String>> itemMap = customerList.stream().parallel().collect(toItemAsKey);

There is certainly a problem in my accumulator implementation or another Function but I cannot figure it out! could anyone suggest what should i do ?


Solution

  • Your combiner is not correctly implemented.
    You overwrite all entries that has the same key. What you want is adding values to existing keys.

    BinaryOperator<ConcurrentHashMap<String,Set<String>>> combiner = (o,o2) -> {
            ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>(o);
            o2.forEach((key, set) -> set.forEach(string -> res.computeIfAbsent(key, k -> new HashSet<>())
                                                              .add(string)));
            return res;
        };