Search code examples
spring-bootapache-kafka-streamsspring-kafka

how to send json data stream to multiple topics in kafka based on input fields


I have to consume json data coming to kafka stream and send to diffrent topics (distinct combination of app id and entity) for further consumption.
topic names :

    app1.entity1
    app1.entity2
    app2.entity1
    app2.entity2

Json Data

    [
        {
            "appId": "app1",
            "entity": "entity1",
            "extractType": "txn",
            "status": "success",
            "fileId": "21151235"
        },
        {
            "appId": "app1",
            "entity": "entity2",
            "extractType": "txn",
            "status": "fail",
            "fileId": "2134234123"
        },
        {
            "appId": "app2",
            "entity": "entity3",
            "extractType": "payment",
            "status": "success",
            "fileId": "2312de23e"
        },
        {
            "appId": "app2",
            "entity": "entity3",
            "extractType": "txn",
            "status": "fail",
            "fileId": "asxs3434"
        }
    ]

TestInput.java

        private String appId;           
        private String entity ;             
        private String extractType;         
        private String status;          
        private String fileId; 

        setter/gtter

SpringBootConfig.java

      @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
      public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
          Map<String, Object> config = new HashMap<>();
          config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
          config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
          config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
          config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JsonSerde<>(TestInput.class).getClass());
          config.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
          config.put(JsonDeserializer.DEFAULT_VALUE_TYPE, TestInput.class);
          return new KafkaStreamsConfiguration(config);
      }

      @Bean
      public KStream<String, TestInput> kStream(StreamsBuilder kStreamBuilder) {
          KStream<String, TestInput> stream = kStreamBuilder.stream(inputTopic);
                 // how to form key , group records and send to different topics
          return stream;
      }

I searched a lot but didnt find anything near which publishes data to topics dynamically. Please help experts


Solution

  • Use stream.branch()

    See https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/

    Next, let’s modify the requirement. Instead of processing all events in the stream, each microservice should take action only on a subset of relevant events. One way to handle this requirement is to have a microservice that subscribes to the original stream with all the events, examines each record and then takes action only on the events it cares about while discarding the rest. However, depending on the application, this may be undesirable or resource intensive.

    A cleaner way is to provide the service with a separate stream that contains only the relevant subset of events that the microservice cares about. To achieve this, a streaming application can branch the original event stream into different substreams using the method KStream#branch(). This results in new Kafka topics, so then the microservice can subscribe to one of the branched streams directly.

    ...