Search code examples

Flink Event Session Window not emitting records

I am writting a pipe to group session for a user keyed by id and window using eventSessionWindow. I am using the Periodic WM and a custom session accumulator which will count the event is a given session.

What is happenning is my window operator is consuming records but not emmiting out. I am not sure what is missing here.

FlinkKafkaConsumer010<String> eventSource =
                new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);

DataStream<Event> eventStream = env.addSource(eventSource
                new FlatMapFunction<String, Event>() {

                    public void flatMap(String value, Collector<Event> out) throws Exception {
                new AssignerWithPeriodicWatermarks<Event>() {
                    long maxTime;

                    public long extractTimestamp(Event element, long previousElementTimestamp) {
                        maxTime = Math.max(previousElementTimestamp, maxTime);
                        return previousElementTimestamp;

                    public Watermark getCurrentWatermark() {
                        return new Watermark(maxTime);

       DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value ->

                .aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
                    public pipe.SessionAccumulator createAccumulator() {
                        return new pipe.SessionAccumulator();

                    public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
                        return sessionAccumulator;

                    public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
                        return sessionAccumulator.getLocalValue();

                    public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
                        return prev;

                }, new WindowFunction<Session, Session, String, TimeWindow>() {
                    public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {

    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();

        public void add(Event e) {


        public Session getLocalValue() {
            return session;

        public void resetLocal() {
            session =  new Session();


        public void merge(Accumulator<Event, Session> accumulator) {


        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
            return sessionAccumulator;

    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();

        public void add(Event e) {


        public Session getLocalValue() {
            return session;

        public void resetLocal() {
            session =  new Session();


        public void merge(Accumulator<Event, Session> accumulator) {


        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(

            return sessionAccumulator;


  • If your watermarks are not advancing, this would explain why no results are being emitted by the window. Possible causes include:

    • Your events haven't been timestamped by Kafka, and thus previousElementTimestamp isn't set.
    • You have an idle Kafka partition holding back the watermarks. (This is a somewhat complex topic. If this turns out to be the cause of your problems, and you get stuck on it, please come back with a new question.)

    Another possibility is that there is never a 5 minute-long gap in the events, in which case the events will accumulate in a never-ending session.

    Also, you don't appear to have included a sink. If you don't print or otherwise send the results to a sink, Flink won't do anything.

    And don't forget that you must call env.execute() to get anything to happen.

    A few other things:

    Your watermark generator isn't allowing for any out-of-orderness, so the window is going to ignore all out-of-order events (because they will be late). If your events have strictly ascending timestamps you should go ahead and use a AscendingTimestampExtractor; if they can be out-of-order, then a BoundedOutOfOrdernessTimestampExtractor is appropriate.

    Your WindowFunction is superfluous. It is simply forwarding downstream the result from the aggregator, so you could remove it.

    You have posted two different implementations of SessionAccumulator.