I'm looking to respond to a REST endpoint with a Success/Failure response that dynamically accepts a topic as a query param. In Quarkus with smallrye reactive messaging the code would look something like below wrapping the payload with OutgoingKafkaRecordMetadata
i.e. https://myendpoint/publishToKafka?topic=myDynamicTopic
@Channel("test")
Emitter<byte []> kafkaEmitter;
@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
kafkaEmitter.send(Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.<String>builder()
.withKey("my-key")
.withTopic("myDynamicTopic")
.build()));
}
From the Quarkus doco "If the endpoint does not return a CompletionStage, the HTTP response may be written before the message is sent to Kafka, and so failures won’t be reported to the user." The example here describes this process when you send a payload directly (i.e. emitter.send(payload) which returns a CompletionStage but emitter.send(message) returns void) but this requires configuring the topic in advance. Is it possible to specify metadata with a Message and still respond to the calling client with a success/failure response? (I don't mind if it's with Emitter and CompletionStage or MunityEmitter and Uni).
Any advice or suggestions would be appreciated.
Because you use a Message (as you need to specify the topic), you need something a bit more convoluted:
@Channel("test")
Emitter<byte []> kafkaEmitter;
@POST
@Path("/publishToKafka")
public CompletionStage<Void> publishRecord(@QueryParam("topic") String topic, byte [] payload){
CompletableFuture<Void> future = new CompletableFuture<>();
Message<byte[]> message = Message.of(payload).addMetadata(OutgoingKafkaRecordMetadata.
<String>builder()
.withKey("my-key")
.withTopic("myDynamicTopic")
.build()));
message = message.withAck(() -> {
future.complete(null));
return CompleteableFuture.completedFuture(null);
}
.withNack(t -> {
future.completeExceptionnaly(t));
return CompleteableFuture.completedFuture(null);
});
kafkaEmitter.send(message);
return future;
}
In this snippet, I also attach the ack and nack handlers called when the message is either acknowledged (accepted by the broker) or rejected (something wrong happened).
These callbacks report to future
, a CompletableFuture created in the method. This is the object to return, as it will do what you want: indicate the outcome.
I know the callbacks are slightly complicated. This is mainly due to the spec: We have to return CompleteableFuture.completedFuture(...);
to acknowledge that the nack-process was successful. If we were to return future;
instead (which we have set to future.completeExceptionnaly(t));
), this would be interpreted as a failure during the nack-process. This would basically be the equivalent to a throw
within a catch
-block in the imperative world.
Fortunately, an easier version will be available soonish (no worries, we won't break).