Search code examples
javaclojureinterfaceapache-kafkareify

Clojure: implementing stateful Java interface


Kafka Streams has an interface, Processor, the implementation of which is stateful. An example implementation given in the developer guide is:

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Long> kvStore;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // call this processor's punctuate() method every 1000 time units.
      this.context.schedule(1000);

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore) context.getStateStore("Counts");
  }

  @Override
  public void process(String dummy, String line) {
      String[] words = line.toLowerCase().split(" ");

      for (String word : words) {
          Long oldValue = kvStore.get(word);
          if (oldValue == null) {
              kvStore.put(word, 1L);
          } else {
              kvStore.put(word, oldValue + 1L);
          }
      }
  }

  @Override
  public void punctuate(long timestamp) {
      KeyValueIterator<String, Long> iter = this.kvStore.all();
      while (iter.hasNext()) {
          KeyValue<String, Long> entry = iter.next();
          context.forward(entry.key, entry.value.toString());
      }
      iter.close();
      // commit the current processing progress
      context.commit();
  }

  @Override
  public void close() {
      // close the key-value store
      kvStore.close();
  }

}

The init method initializes WordCountProcessor's internal state, such as retrieving a key-value store. Other methods, like process and close, make use of this state.

It's not clear to me how to reify such an interface in Clojure. How would we pass on the state retrieved by init to process, close, etc.?

Using a closure?

One idea I have is to use a closure:

(let [ctx (atom nil)]
  (reify Processor
    (close [this]
      ;; Do something w/ ctx
      )
    (init [this context]
      (reset! ctx context))
    (process [this k v]
      ;; Do something w/ ctx
      )
    (punctuate [this timestamp]
      ;; Do something w/ ctx
      )))

Annoyingly, we'd have to start with the ProcessorContext object each time, so the key-value store code would be repeated across all methods that need the key-value store.

I don't see a (general) way around that, though on a case-by-case basis we can replace the ctx atom with more specific state that the methods need.

Is there a better way?


Solution

  • Closing over an atom would be the main way to do it. Your original class has two fields, so you can close over two atoms to get the same effect

    (let [ctx (atom nil)
          kv-store (atom nil)]
      (reify Processor
        ,,,
        (init [this context]
          (reset! ctx context)
          (reset! kv-store (.getStateStore context "Counts")))
        ,,,))
    

    If that's still too tedious then you can add some convenience functions that also close over the atoms

    (let [ctx (atom nil)
          kv-store (atom nil)]
    
      (def kv-get [key]
        (.get @kv-store key))
    
      (def kv-all []
        (iterator-seq (.all @kv-store)))
    
      (def kv-put [key value]
        (.put @kv-store key value))
    
      (reify Processor
        ,,,
        (init [this context]
          (reset! ctx context)
          (reset! kv-store (.getStateStore context "Counts")))
        ,,,
      (punctuate [this timestamp]
        (do-seq [x (kv-all)]
          ,,,)
      )))
    

    The alternative would be to use gen-class, but think you'll be better off with reify.