I am trying to get a sense of what is possible and how to think when working with Kafka Streams
.
There is a topic called Transactions
:
I want to create a cache that will hold all recent transactions (last 10 minutes).
The cache can be queried by a rest client by providing the transaction reference.
Yes, it's very good idea to develop it in kafka-streams
. How to do it?
class Transaction {
Instant createTime;
Status status;
String transactionReference;
}
org.apache.kafka.streams.kstream.Transformer<K, V, R>
:public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {
private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);
private KeyValueStore<String, Transaction> transactions;
@Override
public void init(ProcessorContext context) {
this.transactions = context.getStateStore("transactions-store");
context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
timestamp -> transactions.all()
.forEachRemaining(kV -> {
if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
transactions.delete(kV.key);
}
}));
}
private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
}
@Override
public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
Transaction t = this.transactions.get(transaction.getTransactionReference());
if (t == null) {
transactions.put(transaction.getTransactionReference(), transaction);
}
return null;
}
@Override
public void close() {
}
}
static StreamsBuilder buildKafkaStreamsTopology() {
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
builder.addStateStore(transferProcessKeyValueStore);
builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
.transform(TransactionsCache::new, "transactions-store");
return builder;
}
@RestController
public class TransactionsController {
private final KafkaStreams kafkaStreams;
public TransactionsController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));
return store.get(transactionReference);
}
}
org.apache.kafka.streams.kstream.GlobalKTable<K, V>