I try to create a kafka consumer stream which has a Source of type
Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control
.
from this source I want to use different pathes based on some predicates.
Therefore I'd like to use the divertTo
method and sometimes the alsoTo
method.
Both methods are accepting a Sink. My problem is that when I build this Sink I want to have the Sink to have the materialzed value type Consumer.Control
which is provided by the source.
What I'm doing right now is building the sink like this
private Sink<SomeType, NotUsed> sinkForPathA(){
Flow.of(SomeType.class)
.to(Committer.sink(committerSettings));
}
As you may noticed the materialized value type is now NotUsed which is not desired. What I'd like to have is this:
private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
Flow.of(SomeType.class)
.toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}
Is it somehow possible to create a Flow with a predefined materialized value type and not just NotUsed
?
You can certainly create a Flow
with materialized value of the inner Sink
.
Taking Sink.seq()
which has materialized value type: CompletionStage<List<T>>
as an example.
If you say create a Flow
like this:
Flow.of(Integer.class)
.to(Sink.seq());
then as you pointed out you'll ge NotUsed
in the returned Sink
. The trick is to use toMat
with keepRight
to retain the materialized value type of the Sink
. It would go like this:
Flow.of(Integer.class)
.toMat(Sink.seq(), Keep.right());
and now you'll get Sink<Integer, CompletionStage<List<Integer>>>
as a result.