Search code examples
javaspring-bootspring-webfluxreactive

Java generics and reactive paradigm


I'm new to this reactive programming and I'm a little bit lost. I have a simple factory that retrieves the required class that implements a generic interface:

public interface InterfaceToBeImplemented<T> {

    Flux<Tuple2<KafkaEvent<T>, String>> method(List<String> events);
}

public class InterfaceImplemntationOne implements InterfaceToBeImplented<ConcreteObject> {

    @Override
    public Flux<Tuple2<KafkaEvent<ConcreteObject>, String>> method(List<String> events) {
        ....
    }
}

And a service with a factory that retrieves the required implementation calls the method and returns the result:

public class Factory {

    private InterfaceImplementationOne interfaceImplOne;

    public InterfaceToBeImplemented<?> retrieve(SomeClassifier someClassifier) {
        return switch (someClassifier) {
            case 1 -> imterfaceImplOne;
            default -> throw new RuntimeException("Some message");
        };
    }


public class Service {

    private Factory factory;

    public Flux<Tuple2<KafkaEvent<?>, String>> processEvent(List<String> events) {
        return factory.retrieve(eventClassifier)
                .method(events);
    }

This is not working. I get the message:

Required type: Flux<Tuple2<KafkaEvent<?>, String>>
Provided: Flux<Tuple2<? extends KafkaEvent<?>, String>>

And if I change the return type of processEvent I still get the message:

Required type: Flux<Tuple2<? extends KafkaEvent<?>, String>>
Provided: Flux<Tuple2<? extends KafkaEvent<?>, String>>

Solution

  • That question is related to generics only.

    You have to change signature like this:

    public Flux<? extends Tuple2<? extends KafkaEvent<?>, String>> processEvent(List<String> events) {
        return factory.retrieve(new SomeClassifier()).method(events);
    }
    

    To understand it, let's compare List<List<?>> vs List<? extends List<?>>:

            // a List that works with any kind of lists
            List<List<?>> omnipotentList = new ArrayList<>(); 
            omnipotentList.add(List.of("")); // List<String> is added
            omnipotentList.add(List.of(1)); // List<Integer> is added
    
            // Wildcard is captured.
            // A List that works with some particular lists (for example, List<String>, ArrayList<Integer>, Vector<Optional<Boolean>>),
            // it is fixed, but we don't know what is it exactly.
            List<? extends List<?>> listOfSpecificLists = new ArrayList<>();
            listOfSpecificLists.add(List.of("")); // doesn't compile, listOfSpecificLists works with some specific list, we aren't sure if List<String> is applicable
            listOfSpecificLists.add(List.of(1)); // doesn't compile, listOfSpecificLists works with some specific list, we aren't sure if List<Integer> is applicable
    

    In your case Flux<Tuple2<KafkaEvent<?>, String>> represents "omnipotent" flux, that works with tuples, parametrized with any kind of KafkaEvent. While factory.retrieve(new SomeClassifier()).method(events) returns you a flux, that works with specific tuples of KafkaEvents, that are parametrized with a specific class. It depends on classifier, factory, interface implementation, etc. It's lost what KafkaEvent is parametrized exactly with, but it's captured. So that flux is not an "omnipotent" flux.

    Defining signature as Flux<? extends ...> tells the compiler that is Flux of something specific.

    For more explanation take a look, for example, at this answer. Especially at "Why ? extends is necessary" section.