Search code examples
javaapache-flinkflink-streaming

Apache Flink Process xml and write them to database


i have the following use case.

Xml files are written to a kafka topic which i want to consume and process via flink. The xml attributes have to be renamed to match the database table columns. These renames have to be flexible and maintainable from outside the flink job. At the end the attributes have to be written to the database. Each xml document repesent a database record.

As a second step all some attributes of all xml documents from the last x minutes have to be aggregated.

As i know so far flink is capable of all the mentioned steps but i am lacking of an idea how to implement it corretly.

Currently i have implemented the kafka source, retrieve the xml document and parse it via custom MapFunction. There i create a POJO and store each attribute name and value in a HashMap.

public class Data{
    private Map<String,String> attributes = HashMap<>();
}

HashMap containing:

Key: path.to.attribute.one Value: Value of attribute one

Now i would like to use the Broadcasting State to change the original attribute names to the database column names. At this stage i stuck as i have my POJO data with the attributes inside the HashMap but i don't know how to connect it with the mapping via Broadcasting.

Another way would be to flatMap the xml document attributes in single records. This leaves me with two problems:

  • How to assure that attributes from one document don't get mixed with them from another document within the stream
  • How to merge all the attributes of one document back to insert them as one record into the database

For the second stage i am aware of the Window function even if i don't have understood it in every detail but i guess it would fit my requirement. The question on this stage would be if i can use more than one sink in one job while one would be a stream of the raw data and one of the aggregated.

Can someone help with a hint?

Cheers

UPDATE Here is what i got so far - i simplified the code the XmlData POJO is representing my parsed xml document.

public class StreamingJob {
    static Logger LOG = LoggerFactory.getLogger(StreamingJob.class);

    public static void main(String[] args) throws Exception {

        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        XmlData xmlData1 = new XmlData();
        xmlData1.addAttribute("path.to.attribute.eventName","Start");
        xmlData1.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:00.000");
        xmlData1.addAttribute("third.path.to.attribute.eventSource","Source1");
        xmlData1.addAttribute("path.to.attribute.additionalAttribute","Lorem");

        XmlData xmlData2 = new XmlData();
        xmlData2.addAttribute("path.to.attribute.eventName","Start");
        xmlData2.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
        xmlData2.addAttribute("third.path.to.attribute.eventSource","Source2");
        xmlData2.addAttribute("path.to.attribute.additionalAttribute","First");

        XmlData xmlData3 = new XmlData();
        xmlData3.addAttribute("path.to.attribute.eventName","Start");
        xmlData3.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
        xmlData3.addAttribute("third.path.to.attribute.eventSource","Source1");
        xmlData3.addAttribute("path.to.attribute.additionalAttribute","Day");

        Mapping mapping1 = new Mapping();
        mapping1.addMapping("path.to.attribute.eventName","EVENT_NAME");
        mapping1.addMapping("second.path.to.attribute.eventTimestamp","EVENT_TIMESTAMP");

        DataStream<Mapping> mappingDataStream = env.fromElements(mapping1);

        MapStateDescriptor<String, Mapping> mappingStateDescriptor = new MapStateDescriptor<String, Mapping>(
                "MappingBroadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Mapping>() {}));

        BroadcastStream<Mapping> mappingBroadcastStream = mappingDataStream.broadcast(mappingStateDescriptor);

        DataStream<XmlData> dataDataStream = env.fromElements(xmlData1, xmlData2, xmlData3);

        //Convert the xml with all attributes to a stream of attribute names and values
        DataStream<Tuple2<String, String>> recordDataStream = dataDataStream
                .flatMap(new CustomFlatMapFunction());

        //Map the attributes with the mapping information
        DataStream<Tuple2<String,String>> outputDataStream = recordDataStream
                .connect(mappingBroadcastStream)
                .process();

        env.execute("Process xml data and write it to database");
    }
    
    static class XmlData{
        private Map<String,String> attributes = new HashMap<>();

    public XmlData(){
        }

        public String toString(){
            return this.attributes.toString();
        }

        public Map<String,String> getColumns(){
            return this.attributes;
        }

        public void addAttribute(String key, String value){
            this.attributes.put(key,value);
        }

        public String getAttributeValue(String attributeName){
            return attributes.get(attributeName);
        }
    }
    
    static class Mapping{
        //First string is the attribute path and name
        //Second string is the database column name
        Map<String,String> mappingTuple = new HashMap<>();

        public Mapping(){}

        public void addMapping(String attributeNameWithPath, String databaseColumnName){
            this.mappingTuple.put(attributeNameWithPath,databaseColumnName);
        }

        public Map<String, String> getMappingTuple() {
            return mappingTuple;
        }

        public void setMappingTuple(Map<String, String> mappingTuple) {
            this.mappingTuple = mappingTuple;
        }
    }

    static class CustomFlatMapFunction implements FlatMapFunction<XmlData, Tuple2<String,String>> {

        @Override
        public void flatMap(XmlData xmlData, Collector<Tuple2< String,String>> collector) throws Exception {
            for(Map.Entry<String,String> entrySet : xmlData.getColumns().entrySet()){
                collector.collect(new Tuple2<>(entrySet.getKey(), entrySet.getValue()));
            }
        }
    }

    static class CustomBroadcastingFunction extends BroadcastProcessFunction {
        @Override
        public void processElement(Object o, ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
        }
        @Override
        public void processBroadcastElement(Object o, Context context, Collector collector) throws Exception {
        }
    }
}

Solution

  • Here's some example code of how to do this using a BroadcastStream. There's a subtle issue where the attribute remapping data might show up after one of the records. Normally you'd use a timer with state to hold onto any records that are missing remapping data, but in your case it's unclear whether a missing remapping is a "need to wait longer" or "no mapping exists". In any case, this should get you started...

        private static MapStateDescriptor<String, String> REMAPPING_STATE = new MapStateDescriptor<>("remappings", String.class, String.class);
    
        
        @Test
        public void testUnkeyedStreamWithBroadcastStream() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
    
            List<Tuple2<String, String>> attributeRemapping = new ArrayList<>();
            attributeRemapping.add(new Tuple2<>("one", "1"));
            attributeRemapping.add(new Tuple2<>("two", "2"));
            attributeRemapping.add(new Tuple2<>("three", "3"));
            attributeRemapping.add(new Tuple2<>("four", "4"));
            attributeRemapping.add(new Tuple2<>("five", "5"));
            attributeRemapping.add(new Tuple2<>("six", "6"));
            
            BroadcastStream<Tuple2<String, String>> attributes = env.fromCollection(attributeRemapping)
                    .broadcast(REMAPPING_STATE);
            
            List<Map<String, Integer>> xmlData = new ArrayList<>();
            xmlData.add(makePOJO("one", 10));
            xmlData.add(makePOJO("two", 20));
            xmlData.add(makePOJO("three", 30));
            xmlData.add(makePOJO("four", 40));
            xmlData.add(makePOJO("five", 50));
    
            DataStream<Map<String, Integer>> records = env.fromCollection(xmlData);
            
            records.connect(attributes)
                .process(new MyRemappingFunction())
                .print();
            
            env.execute();
        }
    
        private Map<String, Integer> makePOJO(String key, int value) {
            Map<String, Integer> result = new HashMap<>();
            result.put(key, value);
            return result;
        }
        
        @SuppressWarnings("serial")
        private static class MyRemappingFunction extends BroadcastProcessFunction<Map<String, Integer>, Tuple2<String, String>, Map<String, Integer>> {
    
            @Override
            public void processBroadcastElement(Tuple2<String, String> in, Context ctx, Collector<Map<String, Integer>> out) throws Exception {
                ctx.getBroadcastState(REMAPPING_STATE).put(in.f0, in.f1);
            }
    
            @Override
            public void processElement(Map<String, Integer> in, ReadOnlyContext ctx, Collector<Map<String, Integer>> out) throws Exception {
                final ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(REMAPPING_STATE);
    
                Map<String, Integer> result = new HashMap<>();
                
                for (String key : in.keySet()) {
                    if (state.contains(key)) {
                        result.put(state.get(key), in.get(key));
                    } else {
                        result.put(key, in.get(key));
                    }
                }
                
                out.collect(result);
            }
            
        }