i'm trying to test a RichCoFlatMapFunction that i'm using to make a left join of two streams it is something like this:
private ValueState<Card> currentValueState;
private ListState<Card> historicListState;
@Override
public void open(Configuration parameters) throws Exception {
currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
}
@Override
public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
Iterable<Card> historicCardList = historicListState.get();
if (Iterables.size(historicCardList) > 0) {
out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
} else {
currentValueState.update(currentCard);
out.collect(new Tuple2<>(currentCard, null));
}
}
@Override
public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
historicListState.add(historicCard);
}
In the Method flatMap1 i'm returning a null when a historic Card is not found
out.collect(new Tuple2<>(currentCard, null));
The problem is that when i'm trying to test this whole functionality i'm receiving this error:
Automatic type extraction is not possible on candidates with null values. Please specify the types directly.
This is how i'm trying to test the richCoFlatMapFunction
@Test
public void testFlatMap() throws Exception {
final Card current = currentCard(2L);
final Card historic = historicCard(2L);
final List<Card> historicList = new ArrayList<>();
historicList.add(historic);
CoStreamFlatMap<Card, Card, Tuple2<Card, List<Card>>> operator = new CoStreamFlatMap<>(new LeftJoin());
KeyedTwoInputStreamOperatorTestHarness<Long, Card, Card, Tuple2<Card, List<Card>>> testHarness =
new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
(Card c) -> c.getCardHash(),
(Card h) -> h.getCardHash(),
BasicTypeInfo.LONG_TYPE_INFO);
testHarness.setup();
testHarness.open();
testHarness.processElement1(new StreamRecord<>(current));
testHarness.processElement2(new StreamRecord<>(historic));
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>(current, historicList)));
// Check that the result is correct
ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, actualOutput);
}
Any help would me much appreciated, i'm kinda new on Apache Flink and Unit Testing with it Thanks.
The problem is that the KeyedTwoInputStreamOperatorTestHarness
does not know how to serialize the outputs of your LeftJoin
operator. You can specify an output serializer via the KeyedTwoInputStreamOperatorTestHarness.setup(TypeSerializer<OUT> outputSerializer)
.
In your case it would be:
testHarness.setup(TypeInformation.of(new TypeHint<Tuple2<Card, List<Card>>>() {}).createSerializer(new ExecutionConfig()));