Search code examples
javaakkaakka-stream

Keep materialized value type in akka stream divertTo or alsoTo


I try to create a kafka consumer stream which has a Source of type Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control.

from this source I want to use different pathes based on some predicates. Therefore I'd like to use the divertTo method and sometimes the alsoTo method.

Both methods are accepting a Sink. My problem is that when I build this Sink I want to have the Sink to have the materialzed value type Consumer.Control which is provided by the source. What I'm doing right now is building the sink like this

private Sink<SomeType, NotUsed> sinkForPathA(){
  Flow.of(SomeType.class)
    .to(Committer.sink(committerSettings));
}

As you may noticed the materialized value type is now NotUsed which is not desired. What I'd like to have is this:

private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
  Flow.of(SomeType.class)
    .toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}

Is it somehow possible to create a Flow with a predefined materialized value type and not just NotUsed?


Solution

  • You can certainly create a Flow with materialized value of the inner Sink.

    Taking Sink.seq() which has materialized value type: CompletionStage<List<T>> as an example.

    If you say create a Flow like this:

    Flow.of(Integer.class)
        .to(Sink.seq());
    

    then as you pointed out you'll ge NotUsed in the returned Sink. The trick is to use toMat with keepRight to retain the materialized value type of the Sink. It would go like this:

    Flow.of(Integer.class)
        .toMat(Sink.seq(), Keep.right());
    

    and now you'll get Sink<Integer, CompletionStage<List<Integer>>> as a result.