Search code examples
javafunctional-programmingjava-streamtype-inferencecollectors

Java collector teeing a list of inputs


I am trying to implement a simple collector, which takes a list of collectors and simultaneously collects values in slightly different ways from a stream.

It is quite similar to Collectors.teeing, but differs in that it

  1. Receives a list of collectors instead of just two
  2. Requires all collectors to produce a value of the same type

The type signature I want to have is

public static <T, R> Collector<T, ?, List<R>> list(
      final List<Collector<T, ?, R>> downstreamCollectors);

One way to create such a collector would be to recursively pair up teeing collectors, like so:

public static <T, R> Collector<T, ?, List<R>> list(
    final List<Collector<T, ?, R>> downstreamCollectors) {
  return listrec(
      Collectors.collectingAndThen(downstreamCollectors.get(0), List::of),
      downstreamCollectors.stream().skip(1).toList());
}

private static <T, R> Collector<T, ?, List<R>> listrec(
    final Collector<T, ?, List<R>> teedCollectors,
    final List<Collector<T, ?, R>> downstreamCollectors) {
  if (downstreamCollectors.size() == 0) {
    return teedCollectors;
  } else {
    return listrec(
        teeing(
            teedCollectors,
            downstreamCollectors.get(0),
            (l, s) -> Stream.concat(l.stream(), Stream.of(s)).toList()),
        downstreamCollectors.stream().skip(1).toList());
  }
}

Something feels a little "off" with this solution, so I am trying to create the collector myself, something like:

public static <T, R> Collector<T, ?, List<R>> list2(
    final List<Collector<T, ?, R>> downstreamCollectors) {
  return Collector.of(
      () -> downstreamCollectors.stream().map(c -> c.supplier().get()).toList(),
      (accumulators, t) ->
          IntStream.range(0, downstreamCollectors.size())
              .forEach(
                  i -> downstreamCollectors.get(i).accumulator().accept(accumulators.get(i), t)),
      (accumulator1, accumulator2) ->
          IntStream.range(0, downstreamCollectors.size())
              .mapToObj(
                  i ->
                      downstreamCollectors
                          .get(i)
                          .combiner()
                          .apply(accumulator1.get(i), accumulator2.get(i)))
              .toList(),
      accumulators ->
          IntStream.range(0, downstreamCollectors.size())
              .mapToObj(i -> downstreamCollectors.get(i).finisher().apply(accumulators.get(i)))
              .toList());
}

Because of the unbounded wildcard in the downstream collectors' accumulator type, this doesn't compile. Changing the type signature to

public static <T, A, R> Collector<? super T, ?, List<R>> list2(
    final List<Collector<? super T, A, R>> downstreamCollectors);

solves the problem, but unfortunately renders the method much less usable as the downstream collectors (like the built in collectors from java.util.stream.Collectors) typically would have a unbounded wildcard in the accumulator type.

Is there another way to implement this, keeping the wildcard in the method signature?

I am using OpenJDK 17.0.2.


Solution

  • Handling a list of collectors with arbitrary accumulator types as a flat list can’t be done in a type safe way, as it would require declaring n type variables to capture these types, where n is the actual list size.

    Therefore, you can only implement the processing as a composition of operations, each with a finite number of components know at compile time, like your recursive approach.

    This still has potential for simplifications, like replacing downstreamCollectors.size() == 0 with downstreamCollectors.isEmpty() or downstreamCollectors.stream().skip(1).toList() with a copying free downstreamCollectors.subList(1, downstreamCollectors.size()).

    But the biggest impact has replacing the recursive code with a Stream Reduction operation:

    public static <T, R> Collector<T, ?, List<R>> list(List<Collector<T, ?, R>> collectors) {
        return collectors.stream()
                .<Collector<T, ?, List<R>>>map(c-> Collectors.collectingAndThen(c, List::of))
                .reduce((c1, c2) -> teeing(c1, c2,
                            (l1, l2) -> Stream.concat(l1.stream(), l2.stream()).toList()))
                .orElseThrow(() -> new IllegalArgumentException("no collector specified"));
    }
    

    This may work fairly well if you don’t have a really large number of collectors to compose. A disadvantage of this concise solution is that every result will be wrapped into a single element list before the actual merging of results and even the result merging may bear multiple list copying operations.

    This result processing can be optimized using

    public static <T, R> Collector<T, ?, List<R>> list(List<Collector<T, ?, R>> collectors) {
        int num = collectors.size();
        switch(num) {
            case 0: throw new IllegalArgumentException("no collector specified");
            case 1: return collectingAndThen(collectors.get(0), List::of);
            case 2: return teeing(collectors.get(0), collectors.get(1), List::of);
            case 3: return teeing(teeing(collectors.get(0), collectors.get(1), List::of),
                               collectors.get(2), (l,r) -> List.of(l.get(0), l.get(1), r));
            default:
        }
        Collector<T,?,List<R>> c = teeing(collectors.get(0), collectors.get(1), (r1, r2) -> {
            var list = new ArrayList<R>(num);
            list.add(r1);
            list.add(r2);
            return list;
        });
        for(int ix = 2; ix < num; ix ++) {
            c = teeing(c, collectors.get(ix), (list, r) -> { list.add(r); return list; });
        }
        return collectingAndThen(c, List::copyOf);
    }
    

    This provides special cases for small numbers of collectors whose results can be used to construct an immutable result list directly. For the other cases, all results are added to an ArrayList first, preventing excessive list copying, before converting the list to the final immutable list. This last step could be omitted, if getting an immutable result list is not important, I just tried to be as close to the Stream.toList() behavior of the original approach as possible.

    There’s still an unbalanced recursive structure behind the scenes during the Stream processing which prohibits really large numbers of collectors. There are two approaches to solve this.

    1. Implement your own type safe variant of teeing which exposes the intermediate container type, to allow to build a balanced tree and collecting all results into a list by traversing this tree without additional intermediate storage.

    2. Abandon the type safety and implement the collector with a flat list and raw types. Try to limit the unsafe code as much as possible.

    But this might not be needed when you have an estimate of the expected number of collectors to “tee” and find the first solution working good enough.