Search code examples
javafunctional-programmingjava-streamreducefold

Folding a sequential stream in Java


I'm used to programming in Scala, but I have to write some Java, and I'm trying to perform the equivalent of the following Scala snippet:

trait Options[K, V] {
  def add(key: K , value: V): Options[K, V]
}

val options: Options[T, U] = ???
val elems: List[(T, U)] = ???
elems.foldLeft(options) {
  case (opts, (key, value)) => opts.add(key, value)
}

That is, I'm folding the elements on elems inside options, producing a new instance at each step.

I tried to use Java's Stream#reduce:

interface Options<K, V> {
  Options<K, V> add(K key, V value);
}

Options<K, V> options = ???
Stream<Tuple2<K, V>> elems = ??? // This is Reactor's Tuple2
elems.reduce(options, (opts, opt) -> opts.add(opt), ???)

I don't know what the combiner should be, and I'm having trouble imagining what values its arguments will have. My understanding is that the combiner will be used to combine intermediate values produced in parallel in a parallel stream. I do not care at all about processing elems in parallel in my case. In other terms, I'm looking for a synchronous and sequential version of Flux#reduce.

I have no control on the API of Options. elems does not need to be a Stream.


Solution

  • It's not possible to write a combiner with the interface you've provided. The problem is that a combiner needs a way to combine two Options but there is no way to do that. The only thing anyone can do with an Options instance is add one pair to it. I can't get any information out of it. It presumably can't do anything very useful.

    Perhaps this issue stems from the fact that Java does not have traits and neither are Java interfaces a suitable substitute for traits.

    The idiomatic Java way to write this is just a bog-standard for-loop:

    Options<String, String> options = /*whatever*/;
    List<Pair<String, String>> elems = /*whatever*/;
    for (Pair<String, String> pair : elems)
    {
        options = options.add(pair.getKey(), pair.getValue());
    }
    

    If you can deal with the fact that you'll never been able to use a parallel stream, you can take advantage of the fact that a sequential stream will never actually use a combiner. As such, you can write a Collector which defines a combiner that will just throw an exception.

    Options<String, String> foo = elems.stream()
        .collect(
            () -> options,
            (opt, pair) -> opt.add(pair.getKey(), pair.getValue()),
            (a, b) -> { throw new UnsupportedOperationException(); }
        );
    

    If you really want to use reduce, you'll need to modify your interface to either expose some information about the key-value pairs it contains or to provide a means to add more than one key-value pair at once. For example:

    interface Options<K, V>
    {
        Options<K, V> add(K key, V value);
        Options<K, V> add(Options<K, V> otherOptions);
    }
    
    Options<String, String> options = /*whatever*/;
    List<Pair<String, String>> elems = /*whatever*/;
    
    Options<String, String> foo = elems.stream()
        .reduce(
            options,
            (opt, pair) -> opt.add(pair.getKey(), pair.getValue()),
            Options::add
        );
    

    I doubt that's what you wanted to hear but Scala and Java are different languages. You shouldn't expect everything to have an exact parallel. If it did, there would be no reason for both languages to exist in the first place.