Search code examples
apache-kafkaapache-flinkflink-streaming

Send message to Kafka when SessionWindows was started and ended


I want to send a message to the Kafka topic when new SessionWindow was created and when was ended. I have the following code

stream
    .filter(user -> user.isAdmin)
    .keyBy(user -> user.username)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
//what now? Trigger?

Now I want to send message when new session was started (with some metadata like web browser and timestamps, these informations are available in each element of stream) and send message to Kafka when session was ended (in this example 10 seconds after last element I think) with number of total requests.

It's possible in Flink? I think I should use some trigger but I don't know how and I can't find any example.


Solution

  • You can write a custom window trigger.

    1. How to tell a new session is started?
      You can create a ValueState with the default value to null, so in case the state value is null, it is a session start.

    2. When the session ended?
      Just before TriggerResult.FIRE.

    Here is a demo based on ProcessingTimeTrigger of Flink, I only put the question-related logics here, you can check other details from the source code.

    public class MyProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
        // a state which keeps a session start.
        private final ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<Long>("session-start", Long.class);
    
        @Override
        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            ValueState<Long> state = ctx.getPartitionedState(stateDescriptor);
            if(state.value() == null) {
                // if value is null, it's a session start.
                state.update(window.getStart());
            }
    
            ctx.registerProcessingTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            // here is a session end.
            return TriggerResult.FIRE;
        }
    
        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ctx.getPartitionedState(stateDescriptor).clear();
            ctx.deleteProcessingTimeTimer(window.maxTimestamp());
        }
    }