Please share if there is a Spring cloud kafka streams example of creating and joins across multiple GlobalKTable.
This is derived from the Confluent GlobalKTables examples with two joins between GlobalKTables and KStreams. Note the first stream is through the standard "input"...
@Component
public class TableStreamListener {
// private final StreamsBuilder builder = new StreamsBuilder();
@EnableBinding(DataGen.class)
public class DataAnalyticsProcessorApplication {
/**
* DevNotes: compilation fails unless method returns a KStream
*
* @param ordersStream
* @param customers
* @param products
* @return
*/
@StreamListener
@SendTo("output")
public KStream<Object, EnrichedOrder> process(@Input("input") KStream<Object, Order> ordersStream,
@Input("customers") GlobalKTable<Object, Customer> customers,
@Input("products") GlobalKTable<Object, Product> products) {
// Join the orders stream to the customer global table. As this is global table
// we can use a non-key based join with out needing to repartition the input
// stream
KStream<Object, CustomerOrder> customerOrdersStream = ordersStream
// .peek((key, value) -> System.out.println("ordersStream -- key: " + key + " --
// value: " + value))
.join(customers, (key, value) -> value.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
// Join the enriched customer order stream with the product global table. As
// this is global table
// we can use a non-key based join without needing to repartition the input
// stream
KStream<Object, EnrichedOrder> enrichedOrdersStream = customerOrdersStream
// .peek((key, value) -> System.out.println("customerOrdersStream2 -- key: " +
// key + " -- value: " + value.toString()))
.join(products, (key, value) -> value.getOrder().getProductId(),
(customerOrder, product) -> new EnrichedOrder(product, customerOrder.getCustomer(),
customerOrder.getOrder()));
return enrichedOrdersStream;
}
}
interface DataGen extends KafkaStreamsProcessor {
@Input("customers")
GlobalKTable<?, ?> customers();
@Input("products")
GlobalKTable<?, ?> products();
}
}