Search code examples
apache-kafkaapache-kafka-streamskafka-interactive-queries

How can we club all the values for same key as a list and return Kafka Streams with key and value as String


I have a data coming on kafka topic as (key:id, {id:1, body:...}) means key for the message is same as id. however there can be multiple messages with the same id but different body. so I am getting the kstream <String, String>

Now I want to get all the messages having same id (key) and club all the values as a list and return as

Kstream<String, List<String>>

Any sugessions?


Solution

  •     //Create a Stream with a state store
    
        StreamsBuilder builder = new StreamsBuilder();
    
        StoreBuilder<KeyValueStore<String, List<String>>> logTracerStateStore = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(LOG_TRACE_STATE_STORE), Serdes.String(),
                new ListSerde<String>(Serdes.String()));
                
        //add this to stream builder
        builder.addStateStore(logTracerStateStore);
    
        KStream<String, String> kafkaStream = builder.stream(TOPIC);
        splitProcessor(kafkaStream);
        logger.info("creating stream for topic {} ..", TOPIC);
    
        final Topology topology = builder.build();
        return new KafkaStreams(topology, streamConfiguration(bootstrapServers));
        
    
        // Stream List Serde 
        
        public class ListSerde<T> implements Serde<List<T>> {
    
             private final Serde<List<T>> inner;
    
              public ListSerde( final Serde<T> avroSerde) {
                inner = Serdes.serdeFrom(new ListSerializer<>( avroSerde.serializer()),
                                         new ListDeserializer<>( avroSerde.deserializer()));
              }
    
              @Override
              public Serializer<List<T>> serializer() {
                return inner.serializer();
              }
    
              @Override
              public Deserializer<List<T>> deserializer() {
                return inner.deserializer();
              }
    
              @Override
              public void configure(final Map<String, ?> configs, final boolean isKey) {
                inner.serializer().configure(configs, isKey);
                inner.deserializer().configure(configs, isKey);
              }
    
              @Override
              public void close() {
                inner.serializer().close();
                inner.deserializer().close();
              }
        }
        
        // Serializer & deserializers 
        
        public class ListSerializer<T> implements Serializer<List<T>> {
    
        //  private final Comparator<T> comparator;
          private final Serializer<T> valueSerializer;
    
          public ListSerializer( final Serializer<T> valueSerializer) {
        //      this.comparator = comparator;
              this.valueSerializer = valueSerializer;
          }
          @Override
          public void configure(final Map<String, ?> configs, final boolean isKey) {
              // do nothing
          }
    
          @Override
          public byte[] serialize(final String topic, final List<T> list) {
              final int size = list.size();
              final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos);
              final Iterator<T> iterator = list.iterator();
              try {
                  out.writeInt(size);
                  while (iterator.hasNext()) {
                      final byte[] bytes = valueSerializer.serialize(topic, iterator.next());
                      out.writeInt(bytes.length);
                      out.write(bytes);
                  }
                  out.close();
              } catch (final IOException e) {
                  throw new RuntimeException("unable to serialize List", e);
              }
              return baos.toByteArray();
          }
    
          @Override
          public void close() {
    
          }
    
        }
        
        //------------
        public class ListDeserializer<T> implements Deserializer<List<T>> {
    
        //  private final Comparator<T> comparator;
          private final Deserializer<T> valueDeserializer;
    
          public ListDeserializer(final Deserializer<T> valueDeserializer) {
        //      this.comparator = comparator;
              this.valueDeserializer = valueDeserializer;
          }
    
          @Override
          public void configure(final Map<String, ?> configs, final boolean isKey) {
              // do nothing
          }
    
          @Override
          public List<T> deserialize(final String s, final byte[] bytes) {
              if (bytes == null || bytes.length == 0) {
                  return null;
              }
              final List<T> list = new ArrayList<>();
              final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes));
              try {
                  final int records = dataInputStream.readInt();
                  for (int i = 0; i < records; i++) {
                      final byte[] valueBytes = new byte[dataInputStream.readInt()];
                      dataInputStream.read(valueBytes);
                      list.add(valueDeserializer.deserialize(s, valueBytes));
                  }
        //          dataInputStream.close();
              } catch (final IOException e) {
                  throw new RuntimeException("Unable to deserialize PriorityQueue", e);
              }finally {
                    try {
                        dataInputStream.close();
                    } catch (Exception e2) {
                        // TODO: handle exception
                    }
              }
              return list;
          }
    
          @Override
          public void close() {
    
          }
    
        }
        /// Now create Stream Processors
        
        public class LogTraceStreamStateProcessor implements Processor<String, String>{
    
            private static final Logger logger = Logger.getLogger(LogTraceStreamStateProcessor.class);
            IStore stateStore;
    
            /**
             * Initialize the transformer.
             */
            @Override
            public void init(ProcessorContext context) {
                logger.info("initializing processor and looking for monitoring store");
                stateStore = MonitoringStateStoreFactory.getInstance().getStore();
                logger.debug("found the monitoring store - {} ", stateStore);
                stateStore.initLogTraceStoreProcess(context);
                logger.debug("initalizing monitoring store.");
            }
    
            @Override
            public void process(String key, String value) {
    
                logger.debug("Storing the value for logtrace storage - {} ", value);
                stateStore.storeLogTrace(value);
                logger.debug("finished Storing the value for logtrace storage - {} ", value);
    
            }
    
            @Override
            public void close() {
                // TODO Auto-generated method stub
    
            }
    
        }
        
        // access the key value state store like below
        KeyValueStore<String, List<String>> stateStore =  (KeyValueStore<String, List<String>>) traceStreamContext.getStateStore(EXEID_REQ_REL_STORE);
        
        //Now add a list to new key for a new message and if the key exists then add a new message in the list
         
        public void storeTraceData(String traceData) {
            try {
                TraceEvent tracer = new TraceEvent();
    
                logger.debug("Received the Trace value - {}", traceData);
                tracer = mapper.readValue(traceData, TraceEvent.class);
                logger.debug("trace unmarshelling has been completed successfully !!!");
    
                String key = tracer.getExecutionId();
                
                List<String> listEvents = stateStore.get(key);
    
                if (listEvents != null && !listEvents.isEmpty()) {
    
                    logger.debug("event is already in store so storing in the list for execution id - {}", key);
                    listEvents.add(requestId);
                    stateStore.put(key, listEvents);
                } else {
                    logger.debug(
                            "event is not present in the store so creating a new list and adding into store for execution id - {}",
                            key);
                    List<String> list = new ArrayList<>();
                    list.add(requestId);
    
                    stateStore.put(key, list);
    
                }
    
            } catch (Throwable e) {
                logger.error("exception while processing the trace event .. ", e);
            } finally {
                try {
                    traceStreamContext.commit();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
    
        }
        /// now this is how you can access the message from state store
        public ReadOnlyKeyValueStore<String, List<String>> tracerStore() {
            return waitUntilStoreIsQueryable(KEY_NAME);
        }