Multiple Apache Flink windows validations

I'm just getting started on stream processing using Apache Flink, the thing is that I'm receiving a stream of Json that look like this:


  token_id: “tok_afgtryuo”,

  ip_address: ““,

  device_fingerprint: “abcghift”,

  card_hash: “hgtyuigash”,

  “bin_number”: “424242”,

  “last4”: “4242”,

  “name”: “Seu Jorge”


And was asked if i could fulfill the following business rules:

  • Decline if number of tokens > 5 for this IP in last 10 seconds

  • Decline if number of tokens > 15 for this IP in last minute

  • Decline if number of tokens > 60 for this IP in last hour

I made 2 classes, main class when I'm making an instance to call the Window function with different parameters to avoid duplicate code:

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //This DataStream Would be  Converting the Json to a Token Object
        DataStream<Token> baseStream =
                env.addSource(new SocketTextStreamFunction("localhost",
                        .map(new MapTokens());

        // 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
       DataStreamSink<String> response1 =  new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
               5, "seconds").print();

        //2 -Decline if number of tokens > 15 for this IP in last minute
        DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
                62, "minutes").print();

        //3- Decline if number of tokens > 60 for this IP in last hour
        DataStreamSink<String> response3  = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
                60, "Hours").print();


And another class where I'm doing all the logic for rules, I'm counting the times where an IP address appears and, if it is more than the allowed number in the time window I'm returning a message with some information:

public class RuleMaker {

    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                String tokenProp,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){

                .flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
                    public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {

                         String tokenSelection = "";
                        switch (tokenProp)
                            case "ip":
                                tokenSelection = token.getIpAddress();
                            case "device":
                                tokenSelection = token.getDeviceFingerprint();
                            case "cardHash":
                                tokenSelection = token.getCardHash();
                        collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));

    //Class to process the elements from the window
    private class MyProcessWindowFunction extends ProcessWindowFunction<
            Tuple3<String, Integer, String>,
            > {

        private Integer _maxPetitions;
        private String  _ruleType;

        public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
            this._maxPetitions = maxPetitions;
            this._ruleType = ruleType;

        public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {

            Integer counter = 0;
            for (Tuple3<String, Integer, String> element : iterable) {
                counter += element.f1++;
                if(counter > _maxPetitions){
                    out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " +  _ruleType + " token: " + element.f0 );
                    counter = 0;

So far, i think this code is working but I'm a begginer on Apache Flink, and I'll appreciate a lot if you could tell me if it's something wrong about the way I'm trying to work with this and point me to the right direction.

  • General approach looks very good, although I would have thought that Table API would be powerful enough to help you (more concise) which supports Json out of the box.

    If you want to stick to DataStream API, in getStreamKeyCount, the switch around tokenProp should be replaced by passing a key extractor to getStreamKeyCount to have only one place to add new rules.

    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                KeySelector<Token, String> keyExtractor,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){
        return stream
             .map(token -> new Tuple3<>(keyExtractor.getKey(token), 1, token.get_tokenId()))
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));

    Then the invocation becomes

    DataStreamSink<String> response2 = ruleMaker.getStreamKeyCount(baseStream, 
        Token::getIpAddress, Time.minutes(1), 62, "minutes");