Search code examples
javaapache-kafkaquarkussmallrye-reactive-messaging

How do I properly publish tombstone messages to compacted kafka topic from within a quarkus application?


From within a Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter for sending messages to the topic (as suggested in the quarkus blog). The code for non-tombstone messages (with payload) is:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<MyDataStructure> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
    OutgoingKafkaRecordMetadata<String> metadata =
        OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(myData.getTopicKey())
            .build();
    return CompletableFuture.runAsync(
        () -> emitter.send(Message.of(myData).addMetadata(metadata)));
  }
}

The Emitter also implements <M extends Message<? extends T>> void send(M msg) which I hoped would allow me to craft a Message with payload of null as tombstone message. Unfortunately all implementations of the Message.of(..) factory method, that allow to provide metadata (which is needed to provide the message-key), specify that payload, must not be {@code null}.

What is the proper way (following Quarkus / SmallRye Reactive Messaging concepts) to publish tombstone messages to a Kafka topic using an Emitter?


Solution

  • I would recommend using the Record class (see documentation). A Record is a key/value pair, which represents the key and value of the Kafka record to write. Both can be null, but in your case, only the value part should be null: Record.of(key, null);.

    So, you need to change the type of the Emitter to be Record<Key, Value>, such as:

    @Dependent
    public class Publisher {
    
      @Inject
      @Channel("theChannelName")
      Emitter<Record<Key, MyDataStructure>> emitter;
    
      public CompletionStage<Void> publish(final MyDataStructure myData) {
          return emitter.send(Record.of(myData.getTopicKey(), null);
      }
    }
    

    While runAsync is convenient, emitters are already async. So, no need to use that. In addition, the behavior can be dramatic in containers (if your level of parallelism is less than 2).

    My code returned the result of the send method which is a CompletionStage. That stage will be completed when the record is written in Kafka (and acked by the broker).