Search code examples
javajsonapache-kafkaapache-flinksiddhi

Not able to process kafka json message with Flink siddhi library


I am trying to create a simple application where the app will consume Kafka message do some cql transform and publish to Kafka and below is the code:

JAVA: 1.8 Flink: 1.13 Scala: 2.11 flink-siddhi: 2.11-0.2.2-SNAPSHOT

I am using library: https://github.com/haoch/flink-siddhi

input json to Kafka:

{
   "awsS3":{
      "ResourceType":"aws.S3",
      "Details":{
         "Name":"crossplane-test",
         "CreationDate":"2020-08-17T11:28:05+00:00"
      },
      "AccessBlock":{
         "PublicAccessBlockConfiguration":{
            "BlockPublicAcls":true,
            "IgnorePublicAcls":true,
            "BlockPublicPolicy":true,
            "RestrictPublicBuckets":true
         }
      },
      "Location":{
         "LocationConstraint":"us-west-2"
      }
   }
}

main class:

public class S3SidhiApp {
    public static void main(String[] args) {
        internalStreamSiddhiApp.start();
        //kafkaStreamApp.start();
    }
}

App class:

package flinksidhi.app;

import com.google.gson.JsonObject;
import flinksidhi.event.s3.source.S3EventSource;

import io.siddhi.core.SiddhiManager;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.json.JSONObject;


import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static flinksidhi.app.connector.Consumers.createInputMessageConsumer;
import static flinksidhi.app.connector.Producer.*;

public class internalStreamSiddhiApp {

    private static final String inputTopic = "EVENT_STREAM_INPUT";
    private static final String outputTopic = "EVENT_STREAM_OUTPUT";
    private static final String consumerGroup = "EVENT_STREAM1";
    private static final String kafkaAddress = "localhost:9092";
    private static final String zkAddress = "localhost:2181";

    private static final String S3_CQL1 = "from inputStream select * insert into temp";
    private static final String S3_CQL = "from inputStream select json:toObject(awsS3) as obj insert into temp;" +
            "from temp select json:getString(obj,'$.awsS3.ResourceType') as affected_resource_type," +
            "json:getString(obj,'$.awsS3.Details.Name') as affected_resource_name," +
            "json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration') as encryption," +
            "json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm') as algorithm insert into temp2; " +
            "from temp2 select  affected_resource_name,affected_resource_type, " +
            "ifThenElse(encryption == ' ','Fail','Pass') as state," +
            "ifThenElse(encryption != ' ' and algorithm == 'aws:kms','None','Critical') as severity insert into outputStream";


    public static void start(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //DataStream<String> inputS = env.addSource(new S3EventSource());

        //Flink kafka stream consumer
        FlinkKafkaConsumer<String> flinkKafkaConsumer =
                createInputMessageConsumer(inputTopic, kafkaAddress,zkAddress, consumerGroup);

        //Add Data stream source -- flink consumer
        DataStream<String> inputS = env.addSource(flinkKafkaConsumer);
        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

        cep.registerExtension("json:toObject", io.siddhi.extension.execution.json.function.ToJSONObjectFunctionExtension.class);
        cep.registerExtension( "json:getString", io.siddhi.extension.execution.json.function.GetStringJSONFunctionExtension.class);
        cep.registerStream("inputStream", inputS, "awsS3");


        inputS.print();

        System.out.println(cep.getDataStreamSchemas());
        //json needs extension jars to present during runtime.
        DataStream<Map<String,Object>> output = cep
                .from("inputStream")
                .cql(S3_CQL1)
                .returnAsMap("temp");


        //Flink kafka stream Producer
        FlinkKafkaProducer<Map<String, Object>> flinkKafkaProducer =
                createMapProducer(env,outputTopic, kafkaAddress);

        //Add Data stream sink -- flink producer
        output.addSink(flinkKafkaProducer);
        output.print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

Consumer class:

package flinksidhi.app.connector;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.json.JSONObject;

import java.util.Properties;
public class Consumers {
    public static FlinkKafkaConsumer<String> createInputMessageConsumer(String topic, String kafkaAddress, String zookeeprAddr, String kafkaGroup ) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaAddress);
        properties.setProperty("zookeeper.connect", zookeeprAddr);
        properties.setProperty("group.id",kafkaGroup);
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(
                topic,new SimpleStringSchema(),properties);
        return consumer;
    }
}

Producer class:

package flinksidhi.app.connector;

import flinksidhi.app.util.ConvertJavaMapToJson;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.json.JSONObject;

import java.util.Map;

public class Producer {

    public static FlinkKafkaProducer<Tuple2> createStringProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<Tuple2>(kafkaAddress, topic, new AverageSerializer());
    }

    public static FlinkKafkaProducer<Map<String,Object>> createMapProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {

        return new FlinkKafkaProducer<Map<String,Object>>(kafkaAddress, topic, new SerializationSchema<Map<String, Object>>() {
            @Override
            public void open(InitializationContext context) throws Exception {

            }

            @Override
            public byte[] serialize(Map<String, Object> stringObjectMap) {
                String json = ConvertJavaMapToJson.convert(stringObjectMap);
                return json.getBytes();
            }
        });
    }
}

I have tried many things but the code where the CQL is invoked is never called and doesn't even give any error not sure where is it going wrong.

The same thing if I do creating an internal stream source and use the same input json to return as string it works.


Solution

  • Initial guess: if you are using event time, are you sure you have defined watermarks correctly? As stated in the docs:

    (...) an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed (...)

    If this doesn't help, I would suggest to decompose/simplify the job to a bare minimum, for example just a source operator and some naive sink printing/logging elements. And if that works, start adding back operators one by one. You could also start by simplifying your CEP pattern as much as possible.