Search code examples
javaapache-flinkflink-streaming

Apache Flink: Custom trigger behaves unexpectedly


I have a DataStream that consists of events with a property that represents a batch of produced elements. That property, let's call it 'batchNumber', is constant in every event I ingest from the same production batch. I receive multiple events per batch.

I want to analyze machine performance within a batch when the 'batchNumber' changes. My approach is to use a global stream and partition it using the 'batchNumber' as a key. I would expect that this partitions the global stream into windows in which there is every event with that 'batchNumber'. Then I define a trigger, which should fire when the 'batchNumber' changes. Then I can analyze the aggregated data in a ProcessWindowFunction.

My problems are:

  • The trigger isn't always firing when the prodnr changes
  • Even if it does fire, there is only one element being aggregated. I'm expecting close to 200.

This is the code I'm using.

    public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {

    private static final long serialVersionUID = 1L;

    public batchnrTrigger() {}

    private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);

    @Override
    public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

        ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);

        if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {

            System.out.println("batchnr BEFORE: " + batchnrState.value() + "   NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
            batchnrState.update(element.batchnr);
            return TriggerResult.FIRE;

        }

        System.out.println("batchnr BEFORE: " + batchnrState.value() + "   NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
        batchnrState.update(element.batchnr);
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

    }

}

This is how I call this trigger:

DataStream<String> imaginePaperDataStream = nifiStreamSource
        .map(new ImaginePaperDataConverter())
        .keyBy((ImaginePaperData event) -> event.lunum)
        .window(GlobalWindows.create())
        .trigger(new LunumTrigger())
        .process(new ImaginePaperWindowReportFunction());

I'm aware that this question is similar to this question. But I am using ValueState and I don't think my firing condition is similar at all.

How can I get this working?


Solution

  • Are you sure you want to key the stream by event.lunum? That makes sense if you are expecting roughly 200 events for each distinct value of lunum. But if you only have one event per batch for each value of lunum, that would explain the behavior you are seeing.

    Also, are you sure your events are being processed in order? If the batches are being interleaved somewhere in your processing pipeline by race conditions between parallel processes, that might also help explain what you are seeing.

    Furthermore, you should be clearing the state in the clear method of the Trigger. And you will need to implement an Evictor to that removes elements from the window after it is processed.

    This part of the window API is quite complex. I think this particular application would be much more straightforwardly implemented as a RichFlatMap that gathers items in a ListState until the batch number changes (which you'd keep in ValueState).