I have to consume json data coming to kafka stream and send to diffrent topics (distinct combination of app id and entity) for further consumption.
topic names :
app1.entity1
app1.entity2
app2.entity1
app2.entity2
[
{
"appId": "app1",
"entity": "entity1",
"extractType": "txn",
"status": "success",
"fileId": "21151235"
},
{
"appId": "app1",
"entity": "entity2",
"extractType": "txn",
"status": "fail",
"fileId": "2134234123"
},
{
"appId": "app2",
"entity": "entity3",
"extractType": "payment",
"status": "success",
"fileId": "2312de23e"
},
{
"appId": "app2",
"entity": "entity3",
"extractType": "txn",
"status": "fail",
"fileId": "asxs3434"
}
]
TestInput.java
private String appId;
private String entity ;
private String extractType;
private String status;
private String fileId;
setter/gtter
SpringBootConfig.java
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JsonSerde<>(TestInput.class).getClass());
config.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
config.put(JsonDeserializer.DEFAULT_VALUE_TYPE, TestInput.class);
return new KafkaStreamsConfiguration(config);
}
@Bean
public KStream<String, TestInput> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, TestInput> stream = kStreamBuilder.stream(inputTopic);
// how to form key , group records and send to different topics
return stream;
}
I searched a lot but didnt find anything near which publishes data to topics dynamically. Please help experts
Use stream.branch()
See https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/
Next, let’s modify the requirement. Instead of processing all events in the stream, each microservice should take action only on a subset of relevant events. One way to handle this requirement is to have a microservice that subscribes to the original stream with all the events, examines each record and then takes action only on the events it cares about while discarding the rest. However, depending on the application, this may be undesirable or resource intensive.
A cleaner way is to provide the service with a separate stream that contains only the relevant subset of events that the microservice cares about. To achieve this, a streaming application can branch the original event stream into different substreams using the method KStream#branch(). This results in new Kafka topics, so then the microservice can subscribe to one of the branched streams directly.
...