Search code examples
junitapache-flinkflink-cep

JunitTest DataStream of type Either with flink spector


I'm creating a test to see if the timeout of my flink pattern functions correctly. I'm using flink spector for this and I have the following testcase:

@Test
public void SameDoor_TwoStatuses_OneSecondTimeoutPattern() {
    // Arrange
    long now = new Date().getTime();
    DoorEvent event1 = new DoorEvent();
    event1.setId(123);
    event1.getDoor().setId(1);
    event1.getDoor().setStatus("statusaaaaaa");
    event1.setTimestamp(now);

    EventTimeInputBuilder<DoorEvent> builder = EventTimeInputBuilder.startWith(event1, event1.getTimestamp());
    DataStream<DoorEvent> stream = createTestStream(builder).assignTimestampsAndWatermarks(new TestTimestampExtractor<DoorEvent>());

    // Act
    Pattern<DoorEvent, ?> pattern = StatusNotFollowedByAnotherStatusPattern.getPatternForSameDoor(1, "firstevent", "statusaaaaaa","secondevent", "status2");
    PatternStream<DoorEvent> pStream = CEP.pattern(stream, pattern);

    DataStream<Either<Integer,Tuple2<Integer,Integer>>> patterns = pStream.select(getEventIdOfTimeoutEvent(),selectEventIdsOfPatterns()).forward();
    patterns.print(); //  prints Left(123)

    ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>> expectedRecords = 
        new ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>>()
            .expect(new Left<Integer, Tuple2<Integer,Integer>>(123));

    expectedRecords.refine().sameFrequency();

    // Assert
    assertStream(patterns, expectedRecords); 
}

private PatternSelectFunction<DoorEvent, Tuple2<Integer, Integer>> selectEventIdsOfPatterns(){
    return new PatternSelectFunction<DoorEvent, Tuple2<Integer,Integer>>() {
        private static final long serialVersionUID = 3830508947015151715L;
        @Override
        public Tuple2<Integer,Integer> select(Map<String, List<DoorEvent>> pattern) throws Exception {
            Tuple2<Integer,Integer> t = new Tuple2<Integer,Integer>();
            t.f0 = pattern.get("firstevent").get(0).getId();
            t.f1 = pattern.get("secondevent").get(0).getId();
            return t;
        }
    };
}

private PatternTimeoutFunction<DoorEvent, Integer> getEventIdOfTimeoutEvent(){
    return new PatternTimeoutFunction<DoorEvent, Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Integer timeout(Map<String, List<DoorEvent>> arg0, long arg1) throws Exception {
            int id = arg0.get("firstevent").get(0).getId();
            System.out.println("Timeout triggered on eventstatus " + arg0.get("firstevent").get(0).getDoor().getStatus());
            return id;
        }

    };
}

My code does print the status statusaaaaaa, which is the status of my first event that is in the pattern, in the patternTimeoutFunction. the second status is not detected within the defined timeperiod so the timeout gets called and adds an integer to the patterns stream. How do I say in my ExpectedRecords that I expect an Either Left with a value of 123?

EDIT
The error I currently have is:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.emitTimedOutSequences(TimeoutKeyedCEPPatternOperator.java:77)
    at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.advanceTime(TimeoutKeyedCEPPatternOperator.java:68)
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:242)
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 18 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type.
    at org.apache.flink.api.java.typeutils.EitherTypeInfoFactory.createTypeInfo(EitherTypeInfoFactory.java:37)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromFactory(TypeExtractor.java:1233)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:2054)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:2044)
    at io.flinkspector.datastream.functions.TestSink.invoke(TestSink.java:82)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 26 more

Solution

  • The problem is a bug in spectors TestSink function. The TestSink function tries to extract the generic parameter of Left at runtime which is not possible. Instead it would be necessary to pass this information into the TestSink function when it is instantiated in order to create the correct type serializer. Please open a corresponding issue at the Github repository to let the developers know.