For 1:1 joining, I'm using KeyedCoProcessFunction, I have two streams, lookup stream (100 records per second) and clickstream stream(10000 records per second). In processElement2
method, I'm looking for the key in MapState<Long,Row>
, if found, enriching clickstream data with it else setting this record to side output, and later sinking side output to kafka. I haven't used any window on both input streams. For the dlq topic in kakfa, continuously I'm seeing 1-2 records getting produced per second, how can I somehow wait for a few ms for lookup id in processElement2
method before pushing that to side output.
val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
.connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
.process(new EnrichJoinFunction());
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
@Override
public void open(Configuration parameters) throws Exception {
val MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
val id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
val id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not yet arrived, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
}}
You can use TimerService to achieve this.
So, the idea is to store the clickstream rows which don't have immediatly matching lookup data in a dedicated MapState<Long,Row>
and register processingTimeTimer/eventTimeTimer
timer which will be triggered some time later. On timer callback, you can try to join lookup data and clickstream data there. If no match found again, then finally send this click event to side output.
It might look as follow:
public static class EnrichJoinFunction
extends KeyedCoProcessFunction<Long, Row, Row, Row> {
final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
private MapState<Long, Row> map = null;
private MapState<Long, Row> clickstreamState = null;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Row> MapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"state",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
/*MapStateDescriptor.setQueryable("test");*/
map = getRuntimeContext().getMapState(MapStateDescriptor);
MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
new MapStateDescriptor<Long, Row>(
"clickstreamState",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<Row>() {}));
clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
}
@Override
public void processElement1(
Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
Long id = lookupRow.<Long>getFieldAs("id");
if (!map.contains(id)) {
map.put(id, lookupRow);
}
// join immediately any matching click events, waiting for counterpart
if (clickstreamState.contains(id)) {
// enrich join
Row joinRow = join(clickstreamState.get(id), lookupRow);
out.collect(joinRow);
clickstreamState.remove(id)
}
}
@Override
public void processElement2(
Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
throws Exception {
log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
Long id = clickRow.<Long>getFieldAs("id");
if (map.contains(id)) {
// enrich join
Row joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// put in state and check in 1 second
clickstreamState.put(id, clickRow)
Long currTimestamp = ctx.timestamp()
ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
}
}
public Row join(Row clickRow, Row lookupRow) throws ParseException {
Row joinedRow = new Row(RowKind.INSERT, 13);
// row setter join ouput
return joinedRow;
}
@Override
public void onTimer(
Long timestamp,
KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
) {
Long id = ctx.getCurrentKey
Row clickRow = clickstreamState.get(id)
if (map.contains(id)) {
// enrich join
val joinRow = join(clickRow, map.get(id));
out.collect(joinRow);
} else {
// lookup entry not arrived even in 1 second, send it to side output - dlq
ctx.output(outputTag, clickRow);
}
clickstreamState.remove(id)
}
}}