@Incoming("from-processor-to-consumer")
public Multi<Void> consume(Multi<String> stream) {
return stream.onItem()
.invoke(msg -> {
log.infof("consumer received %s", msg);
})
.onItem()
.ignore();
}
When running this method raises the ClassCastException
:
consume has thrown an exception: java.lang.ClassCastException: class java.lang.String cannot be cast to class io.smallrye.mutiny.Multi
Can I consume Multi<String>
?
Looking at the SmallRye Reactive Messaging > Development Model > Consuming Payloads it seems I can only consume one message at time.
First, I must admit that the error message is useless and should be improved.
The problem is your application signature.
@Incoming("from-processor-to-consumer")
@Outgoing("this-is-missing-in-your-code")
public Multi<Void> consume(Multi<String> stream) {
return stream.onItem()
.invoke(msg -> {
log.infof("consumer received %s", msg);
})
.onItem()
.ignore();
}
The @Outgoing
annotation is missing. When returning a Multi,
you need to say "where."
If your method is terminal (meaning is the final consumer), you cannot inject a Multi
. The workaround is to use a simple no-op method:
@Incoming("from-processor-to-consumer")
@Outgoing("this-is-missing-in-your-code")
public Multi<String> consume(Multi<String> stream) {
return stream.onItem()
.invoke(msg -> {
log.infof("consumer received %s", msg);
});
}
@Incoming("this-is-missing-in-your-code")
void noop(String s) { }