Search code examples
apache-flinkflink-streaming

Flink connect streams using KeyedCoProcessFunction


For 1:1 joining, I'm using KeyedCoProcessFunction, I have two streams, lookup stream (100 records per second) and clickstream stream(10000 records per second). In processElement2 method, I'm looking for the key in MapState<Long,Row>, if found, enriching clickstream data with it else setting this record to side output, and later sinking side output to kafka. I haven't used any window on both input streams. For the dlq topic in kakfa, continuously I'm seeing 1-2 records getting produced per second, how can I somehow wait for a few ms for lookup id in processElement2 method before pushing that to side output.

val joinStream = lookDataStream.keyBy(row -> row.<Long>getFieldAs("id"))
            .connect(clickDataStream.keyBy(row -> row.<Long>getFieldAs("lookupid")))
            .process(new EnrichJoinFunction());
public static class EnrichJoinFunction
      extends KeyedCoProcessFunction<Long, Row, Row, Row> {


    final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};

    private MapState<Long, Row> map = null;

    @Override
    public void open(Configuration parameters) throws Exception {
      val MapStateDescriptor =
          new MapStateDescriptor<Long, Row>(
              "state",
              TypeInformation.of(Long.class),
              TypeInformation.of(new TypeHint<Row>() {}));
      MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
      /*MapStateDescriptor.setQueryable("test");*/
      map = getRuntimeContext().getMapState(MapStateDescriptor);
    }

    @Override
    public void processElement1(
        Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
      val id = lookupRow.<Long>getFieldAs("id");
      if (!map.contains(id)) {
        map.put(id, lookupRow);
      }
    }

    @Override
    public void processElement2(
        Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
        throws Exception {
      log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));

      val id = clickRow.<Long>getFieldAs("id");

      if (map.contains(id)) {
          // enrich join
          val joinRow = join(clickRow, map.get(id));
          out.collect(joinRow);
      } else {
        // lookup entry not yet arrived, send it to side output - dlq
        ctx.output(outputTag, clickRow);
      }
    }

    public Row join(Row clickRow, Row lookupRow) throws ParseException {
      Row joinedRow = new Row(RowKind.INSERT, 13);
      // row setter join ouput
      return joinedRow;
    }
}}

Solution

  • You can use TimerService to achieve this.

    So, the idea is to store the clickstream rows which don't have immediatly matching lookup data in a dedicated MapState<Long,Row> and register processingTimeTimer/eventTimeTimer timer which will be triggered some time later. On timer callback, you can try to join lookup data and clickstream data there. If no match found again, then finally send this click event to side output.

    It might look as follow:

    public static class EnrichJoinFunction
          extends KeyedCoProcessFunction<Long, Row, Row, Row> {
    
    
        final OutputTag<Row> outputTag = new OutputTag<Row>("side-output") {};
    
        private MapState<Long, Row> map = null;
        private MapState<Long, Row> clickstreamState = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
          MapStateDescriptor<Long, Row> MapStateDescriptor =
              new MapStateDescriptor<Long, Row>(
                  "state",
                  TypeInformation.of(Long.class),
                  TypeInformation.of(new TypeHint<Row>() {}));
          MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(15)).build());
          /*MapStateDescriptor.setQueryable("test");*/
          map = getRuntimeContext().getMapState(MapStateDescriptor);
    
          MapStateDescriptor<Long, Row> clickstreamStateMapStateDescriptor =
              new MapStateDescriptor<Long, Row>(
                  "clickstreamState",
                  TypeInformation.of(Long.class),
                  TypeInformation.of(new TypeHint<Row>() {}));
          clickstreamState MapStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(1)).build());
          clickstreamState = getRuntimeContext().getMapState(clickstreamStateMapStateDescriptor);
        }
    
        @Override
        public void processElement1(
            Row lookupRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
            throws Exception {
          log.debug("Received Lookup Record" + RowUtils.printRow(lookupRow));
          Long id = lookupRow.<Long>getFieldAs("id");
          if (!map.contains(id)) {
            map.put(id, lookupRow);
          }
    
          // join immediately any matching click events, waiting for counterpart
          if (clickstreamState.contains(id)) {
              // enrich join
              Row joinRow = join(clickstreamState.get(id), lookupRow);
              out.collect(joinRow);
              clickstreamState.remove(id)
          } 
        }
    
        @Override
        public void processElement2(
            Row clickRow, KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out)
            throws Exception {
          log.debug("Received Click stream Record" + RowUtils.printRow(clickRow));
    
          Long id = clickRow.<Long>getFieldAs("id");
    
          if (map.contains(id)) {
              // enrich join
              Row joinRow = join(clickRow, map.get(id));
              out.collect(joinRow);
          } else {
            // put in state and check in 1 second
            clickstreamState.put(id, clickRow)
            Long currTimestamp = ctx.timestamp()
            ctx.timerService().registerProcessingTimeTimer(currTimestamp + 1000)
          }
        }
    
        public Row join(Row clickRow, Row lookupRow) throws ParseException {
          Row joinedRow = new Row(RowKind.INSERT, 13);
          // row setter join ouput
          return joinedRow;
        }
    
        @Override
        public void onTimer(
          Long timestamp,
          KeyedCoProcessFunction<Long, Row, Row, Row>.Context ctx, Collector<Row> out
        ) {
           Long id = ctx.getCurrentKey
           Row clickRow = clickstreamState.get(id)
           if (map.contains(id)) {
              // enrich join
              val joinRow = join(clickRow, map.get(id));
              out.collect(joinRow);
           } else {
              // lookup entry not arrived even in 1 second, send it to side output - dlq
              ctx.output(outputTag, clickRow);
           }
           clickstreamState.remove(id)
      }
    }}