Search code examples
spring-cloud-streamspring-kafka

Any example available for creating GlobalKTable using Spring cloud stream kafka-streams?


Please share if there is a Spring cloud kafka streams example of creating and joins across multiple GlobalKTable.


Solution

  • This is derived from the Confluent GlobalKTables examples with two joins between GlobalKTables and KStreams. Note the first stream is through the standard "input"...

    @Component
    public class TableStreamListener {
    
    //    private final StreamsBuilder builder = new StreamsBuilder();
    
        @EnableBinding(DataGen.class)
        public class DataAnalyticsProcessorApplication {
    
            /**
             * DevNotes: compilation fails unless method returns a KStream
             *
             * @param ordersStream
             * @param customers
             * @param products
             * @return
             */
            @StreamListener
            @SendTo("output")
            public KStream<Object, EnrichedOrder> process(@Input("input") KStream<Object, Order> ordersStream,
                    @Input("customers") GlobalKTable<Object, Customer> customers,
                    @Input("products") GlobalKTable<Object, Product> products) {
    
                // Join the orders stream to the customer global table. As this is global table
                // we can use a non-key based join with out needing to repartition the input
                // stream
                KStream<Object, CustomerOrder> customerOrdersStream = ordersStream
                        // .peek((key, value) -> System.out.println("ordersStream -- key: " + key + " --
                        // value: " + value))
                        .join(customers, (key, value) -> value.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order));
    
                // Join the enriched customer order stream with the product global table. As
                // this is global table
                // we can use a non-key based join without needing to repartition the input
                // stream
                KStream<Object, EnrichedOrder> enrichedOrdersStream = customerOrdersStream
                        // .peek((key, value) -> System.out.println("customerOrdersStream2 -- key: " +
                        // key + " -- value: " + value.toString()))
                        .join(products, (key, value) -> value.getOrder().getProductId(),
                                (customerOrder, product) -> new EnrichedOrder(product, customerOrder.getCustomer(),
                                        customerOrder.getOrder()));
    
                return enrichedOrdersStream;
            }
    
        }
    
        interface DataGen extends KafkaStreamsProcessor {
    
            @Input("customers")
            GlobalKTable<?, ?> customers();
    
            @Input("products")
            GlobalKTable<?, ?> products();
    
        }
    }