Search code examples
apache-flinkflink-streaming

how to emit result of processing an event without delay in flink


We are considering flink for a usecase, but not sure whether flink is suitable for it. Here is my usecase. When an event e1 arrives, we need to process it and emit a result. Source and sink are not relevant for this discussion but you can think of a message queue service as source and sink. Entire processing of an event is independent of other events. So while processing event e1, we don't need e2 or any other event. As part of the processing, we need to do step1, step2, step3, step4 as shown in the below diagram. Note that step2 and step3 should be done in parallel.

enter image description here

The processing latency of an event is critical for us. So I need to emit the result as soon as processing is complete for that element instead of waiting for some window timeout. With my limited knowledge in Flink, I could only think of the below approach

DataStream<Map<String, Object>> step1 = env.addSource(...);
DataStream<Map<String, Object>> step2 = step1.map(...);
DataStream<Map<String, Object>> step3 = step1.map(...);

Now, how do I combine the results from step2 and step3 and emit the result? In this simple example I only have two steams to merge, but it can be more than 2 as well. I could do a union of the streams. I can have a unique event id to group the outputs of intermediates steps related to a particular event.

DataStream<Map<String, Object>> mergedStream = step1.union(step2).keyBy(...);

But how to emit the result? Ideally, I would like to say "emit the result as soon as I get output from step2 and step3 for a specific key" instead of "emit the result every 30 millis". The later has two problems: it may emit partial results and it has delay. Is there any way to specify the former? I'm exploring Flink, but I'm open to consider other alternatives if it solves my usecase.


Solution

  • In step 1, add an event id. Then after the union, key the stream by the event id and use a RichFlatMapFunction to combine the results of steps 2 and 3 back into a single event. If steps 2 and 3 emit events of type EnrichedEvent, then step 4 can be:

    static class FanIn extends RichFlatMapFunction<EnrichedEvent, EnrichedEvent> {
        private transient ValueState<EnrichedEvent> enrichmentResponseState;
    
        @Override
        public void flatMap(EnrichedEvent value, Collector<EnrichedEvent> out) throws Exception {
            EnrichedEvent response = enrichmentResponseState.value();
    
            if (response != null) {
                response = response.combine(value);
            } else {
                response = value;
            }
    
            if (response.isComplete()) {
                out.collect(response);
                enrichmentResponseState.clear();
            } else {
                enrichmentResponseState.update(response);
            }
        }
    
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<EnrichedEvent> fanInStateDescriptor =
                new ValueStateDescriptor<>( "enrichmentResponse",
                    TypeInformation.of(new TypeHint<EnrichedEvent>() {})
                );
    
            enrichmentResponseState = getRuntimeContext().getState(fanInStateDescriptor);
        }
    }
    

    After that it's a simple matter to send the merged final result to a sink.