I'm working on a project that needs to receive an event from GCP Pub/Sub, download file, parse and process file, and finally publish the results to the Kafka broker but I'm getting SRMSG00034: Insufficient downstream requests to emit item.
In my first attempt to publish to Kafka I was iterating msgList using stream but I read that Mutiny (https://quarkus.io/blog/mutiny-back-pressure/) can control the back pressure but I'm getting the same error.
In my scenario I've to publish two different lists and one of them has about 10k messages. I read that I can control the overflow configuration with @OnOverflow but I prefer to maintain the default configuration unless it's necessary some change.
Multi.createFrom().iterable(msgList)
.onItem().transform(item -> {
... some transformation ...
})
.onItem().invoke(emitter::send)
.subscribe().with(
item -> Uni.createFrom().voidItem(),
Throwable::printStackTrace,
() -> System.out.println("Done!")
);
Could you point me to the right direction in order to solve this issue?
Thanks in advance
emitter.send
is an async method. In your code, it ignores the outcome and send
returns immediately. That's probably not what you want. I would recommend using a MutinyEmitter
and do onItem().call(emitter::send)
. In this case, you are going to wait for the message to be sent. Note that if sending the message fails, it will propagate the failure. With that, back-pressure will be applied as it will only get a new message when the previous one has been acknowledged.
If you want to send a batch of messages, use .group().asList().of(...)
, and send all the messages from the list. However, as before, you would need to wait for the ack of all the messages. Uni.join
will let you do that.
If you want to send messages concurrently but with a constrained concurrency use:
.onItem().transformToUni(m -> emitter.send(m)).merge(concurrency)
It will send concurrency
messages concurrently. However, the order is not guaranteed in that case.