Search code examples
flink-streaming

Flink Event Session Window not emitting records


I am writting a pipe to group session for a user keyed by id and window using eventSessionWindow. I am using the Periodic WM and a custom session accumulator which will count the event is a given session.

What is happenning is my window operator is consuming records but not emmiting out. I am not sure what is missing here.


FlinkKafkaConsumer010<String> eventSource =
                new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);
        eventSource.setStartFromLatest();

DataStream<Event> eventStream = env.addSource(eventSource
        ).flatMap(
                new FlatMapFunction<String, Event>() {

                    @Override
                    public void flatMap(String value, Collector<Event> out) throws Exception {
                        out.collect(Event.toEvent(value));
                    }
                }
        ).assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<Event>() {
                    long maxTime;

                    @Override
                    public long extractTimestamp(Event element, long previousElementTimestamp) {
                        maxTime = Math.max(previousElementTimestamp, maxTime);
                        return previousElementTimestamp;
                    }

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(maxTime);
                    }
                }
        );

       DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value -> value.id)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))

                .aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
                    @Override
                    public pipe.SessionAccumulator createAccumulator() {
                        return new pipe.SessionAccumulator();
                    }

                    @Override
                    public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
                        sessionAccumulator.add(e);
                        return sessionAccumulator;
                    }

                    @Override
                    public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
                        return sessionAccumulator.getLocalValue();
                    }

                    @Override
                    public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
                        prev.merge(next);
                        return prev;
                    }

                }, new WindowFunction<Session, Session, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {
                        collector.collect(iterable.iterator().next());
                    }
                });


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
            );
            return sessionAccumulator;
        }
    }


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
                    session.lastEventTime,
                    session.earliestEventTime,
                    session.count;

            );
            return sessionAccumulator;
        }
    }


Solution

  • If your watermarks are not advancing, this would explain why no results are being emitted by the window. Possible causes include:

    • Your events haven't been timestamped by Kafka, and thus previousElementTimestamp isn't set.
    • You have an idle Kafka partition holding back the watermarks. (This is a somewhat complex topic. If this turns out to be the cause of your problems, and you get stuck on it, please come back with a new question.)

    Another possibility is that there is never a 5 minute-long gap in the events, in which case the events will accumulate in a never-ending session.

    Also, you don't appear to have included a sink. If you don't print or otherwise send the results to a sink, Flink won't do anything.

    And don't forget that you must call env.execute() to get anything to happen.

    A few other things:

    Your watermark generator isn't allowing for any out-of-orderness, so the window is going to ignore all out-of-order events (because they will be late). If your events have strictly ascending timestamps you should go ahead and use a AscendingTimestampExtractor; if they can be out-of-order, then a BoundedOutOfOrdernessTimestampExtractor is appropriate.

    Your WindowFunction is superfluous. It is simply forwarding downstream the result from the aggregator, so you could remove it.

    You have posted two different implementations of SessionAccumulator.