Search code examples
javaapache-flink

Apache Flink Dynamic Pipeline


I'm working on creating a framework to allow customers to create their own plugins to my software built on Apache Flink. I've outlined in a snippet below what I'm trying to get working (just as a proof of concept), however I'm getting a org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. error when trying to upload it.

I want to be able to branch the input stream into x number of different pipelines, then having those combine together into a single output. What I have below is just my simplified version I'm starting with.

public class ContentBase {

  public static void main(String[] args) throws Exception {

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kf-service:9092");
    properties.setProperty("group.id", "varnost-content");

    // Setup up execution environment and get stream from Kafka
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<ObjectNode> logs = see.addSource(new FlinkKafkaConsumer011<>("log-input",
                    new JSONKeyValueDeserializationSchema(false), properties).setStartFromLatest())
            .map((MapFunction<ObjectNode, ObjectNode>) jsonNodes -> (ObjectNode) jsonNodes.get("value"));


    // Create a new List of Streams, one for each "rule" that is being executed
    // For now, I have a simple custom wrapper on flink's `.filter` function in `MyClass.filter`
    List<String> codes = Arrays.asList("404", "200", "500");
    List<DataStream<ObjectNode>> outputs = new ArrayList<>();
    for (String code : codes) {
      outputs.add(MyClass.filter(logs, "response", code));
    }

    // It seemed as though I needed a seed DataStream to union all others on 
    ObjectMapper mapper = new ObjectMapper();
    ObjectNode seedObject = (ObjectNode) mapper.readTree("{\"start\":\"true\"");
    DataStream<ObjectNode> alerts = see.fromElements(seedObject);

    // Union the output of each "rule" above with the seed object to then output
    for (DataStream<ObjectNode> output : outputs) {
      alerts.union(output);
    }


    // Convert to string and sink to Kafka
    alerts.map((MapFunction<ObjectNode, String>) ObjectNode::toString)
            .addSink(new FlinkKafkaProducer011<>("kf-service:9092", "log-output", new SimpleStringSchema()));
    see.execute();
  }
}

I can't figure out how to get the actual error out of the Flink web interface to add that information here


Solution

  • There were a few errors I found.

    1) A Stream Execution Environment can only have one input (apparently? I could be wrong) so adding the .fromElements input was not good

    2) I forgot all DataStreams are immutable so the .union operation creates a new DataStream output.

    The final result ended up being much simpler

    public class ContentBase {
    
      public static void main(String[] args) throws Exception {
    
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kf-service:9092");
        properties.setProperty("group.id", "varnost-content");
    
        // Setup up execution environment and get stream from Kafka
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<ObjectNode> logs = see.addSource(new FlinkKafkaConsumer011<>("log-input",
                        new JSONKeyValueDeserializationSchema(false), properties).setStartFromLatest())
                .map((MapFunction<ObjectNode, ObjectNode>) jsonNodes -> (ObjectNode) jsonNodes.get("value"));
    
    
        // Create a new List of Streams, one for each "rule" that is being executed
        // For now, I have a simple custom wrapper on flink's `.filter` function in `MyClass.filter`
        List<String> codes = Arrays.asList("404", "200", "500");
        List<DataStream<ObjectNode>> outputs = new ArrayList<>();
        for (String code : codes) {
          outputs.add(MyClass.filter(logs, "response", code));
        }
    
        Optional<DataStream<ObjectNode>> alerts = outputs.stream().reduce(DataStream::union);
    
    
        // Convert to string and sink to Kafka
        alerts.map((MapFunction<ObjectNode, String>) ObjectNode::toString)
                .addSink(new FlinkKafkaProducer011<>("kf-service:9092", "log-output", new SimpleStringSchema()));
        see.execute();
      }
    }