Search code examples
apache-kafkaapache-kafka-streams

Building a cache with Kafka Streams


I am trying to get a sense of what is possible and how to think when working with Kafka Streams.

The Use-Case:

There is a topic called Transactions:

  • key -> transactionReference (String)
  • value -> timestamp, approved/canceled (JSON String)

I want to create a cache that will hold all recent transactions (last 10 minutes).

The cache can be queried by a rest client by providing the transaction reference.

Questions:

  1. Is Kafka streams (along with it's Materialized views) a good fit to implement such a cache?
  2. If yes how would you go about it? Remember it needs to keep only the last 10 minutes of transactions and discard the older ones.
  3. If not, why not?

Solution

  • Yes, it's very good idea to develop it in kafka-streams. How to do it?

    1. First, create class that represents values of the cache:
    class Transaction {
     Instant createTime;
     Status status;
     String transactionReference;
    }
    
    1. Second, create class that handles cache logic - implements org.apache.kafka.streams.kstream.Transformer<K, V, R>:
    public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {
    
        private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);
    
        private KeyValueStore<String, Transaction> transactions;
    
        @Override
        public void init(ProcessorContext context) {
            this.transactions = context.getStateStore("transactions-store");
            context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
                timestamp -> transactions.all()
                    .forEachRemaining(kV -> {
                        if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
                            transactions.delete(kV.key);
                        }
                    }));
        }
    
        private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
            return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
        }
    
        @Override
        public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
            Transaction t = this.transactions.get(transaction.getTransactionReference());
            if (t == null) {
                transactions.put(transaction.getTransactionReference(), transaction);
            }
            return null;
        }
    
        @Override
        public void close() {
    
        }
    }
    
    1. Then, register transformer in topology:
        static StreamsBuilder buildKafkaStreamsTopology() {
            StreamsBuilder builder = new StreamsBuilder();
            
            StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
                .keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
            builder.addStateStore(transferProcessKeyValueStore);
    
            builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
                .transform(TransactionsCache::new, "transactions-store");
    
            return builder;
        }
    
    1. Next step is reading the data in http controller:
    @RestController
    public class TransactionsController {
    
        private final KafkaStreams kafkaStreams;
    
        public TransactionsController(KafkaStreams kafkaStreams) {
            this.kafkaStreams = kafkaStreams;
        }
    
        @GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
        Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
            ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
                StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));
    
            return store.get(transactionReference);
        }
    }
    
    
    1. Last thing. Remember that this in memory cache is partitioned by default so in case of running many instances of your application you need to add some RPC method to get data from another instance in case of miss (Kafka Interactive Queries), here you have some very neat example. Or second solution is to use org.apache.kafka.streams.kstream.GlobalKTable<K, V>