i have the following use case.
Xml files are written to a kafka topic which i want to consume and process via flink. The xml attributes have to be renamed to match the database table columns. These renames have to be flexible and maintainable from outside the flink job. At the end the attributes have to be written to the database. Each xml document repesent a database record.
As a second step all some attributes of all xml documents from the last x minutes have to be aggregated.
As i know so far flink is capable of all the mentioned steps but i am lacking of an idea how to implement it corretly.
Currently i have implemented the kafka source, retrieve the xml document and parse it via custom MapFunction. There i create a POJO and store each attribute name and value in a HashMap.
public class Data{
private Map<String,String> attributes = HashMap<>();
}
HashMap containing:
Key: path.to.attribute.one Value: Value of attribute one
Now i would like to use the Broadcasting State to change the original attribute names to the database column names. At this stage i stuck as i have my POJO data with the attributes inside the HashMap but i don't know how to connect it with the mapping via Broadcasting.
Another way would be to flatMap the xml document attributes in single records. This leaves me with two problems:
For the second stage i am aware of the Window function even if i don't have understood it in every detail but i guess it would fit my requirement. The question on this stage would be if i can use more than one sink in one job while one would be a stream of the raw data and one of the aggregated.
Can someone help with a hint?
Cheers
UPDATE Here is what i got so far - i simplified the code the XmlData POJO is representing my parsed xml document.
public class StreamingJob {
static Logger LOG = LoggerFactory.getLogger(StreamingJob.class);
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
XmlData xmlData1 = new XmlData();
xmlData1.addAttribute("path.to.attribute.eventName","Start");
xmlData1.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:00.000");
xmlData1.addAttribute("third.path.to.attribute.eventSource","Source1");
xmlData1.addAttribute("path.to.attribute.additionalAttribute","Lorem");
XmlData xmlData2 = new XmlData();
xmlData2.addAttribute("path.to.attribute.eventName","Start");
xmlData2.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
xmlData2.addAttribute("third.path.to.attribute.eventSource","Source2");
xmlData2.addAttribute("path.to.attribute.additionalAttribute","First");
XmlData xmlData3 = new XmlData();
xmlData3.addAttribute("path.to.attribute.eventName","Start");
xmlData3.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
xmlData3.addAttribute("third.path.to.attribute.eventSource","Source1");
xmlData3.addAttribute("path.to.attribute.additionalAttribute","Day");
Mapping mapping1 = new Mapping();
mapping1.addMapping("path.to.attribute.eventName","EVENT_NAME");
mapping1.addMapping("second.path.to.attribute.eventTimestamp","EVENT_TIMESTAMP");
DataStream<Mapping> mappingDataStream = env.fromElements(mapping1);
MapStateDescriptor<String, Mapping> mappingStateDescriptor = new MapStateDescriptor<String, Mapping>(
"MappingBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Mapping>() {}));
BroadcastStream<Mapping> mappingBroadcastStream = mappingDataStream.broadcast(mappingStateDescriptor);
DataStream<XmlData> dataDataStream = env.fromElements(xmlData1, xmlData2, xmlData3);
//Convert the xml with all attributes to a stream of attribute names and values
DataStream<Tuple2<String, String>> recordDataStream = dataDataStream
.flatMap(new CustomFlatMapFunction());
//Map the attributes with the mapping information
DataStream<Tuple2<String,String>> outputDataStream = recordDataStream
.connect(mappingBroadcastStream)
.process();
env.execute("Process xml data and write it to database");
}
static class XmlData{
private Map<String,String> attributes = new HashMap<>();
public XmlData(){
}
public String toString(){
return this.attributes.toString();
}
public Map<String,String> getColumns(){
return this.attributes;
}
public void addAttribute(String key, String value){
this.attributes.put(key,value);
}
public String getAttributeValue(String attributeName){
return attributes.get(attributeName);
}
}
static class Mapping{
//First string is the attribute path and name
//Second string is the database column name
Map<String,String> mappingTuple = new HashMap<>();
public Mapping(){}
public void addMapping(String attributeNameWithPath, String databaseColumnName){
this.mappingTuple.put(attributeNameWithPath,databaseColumnName);
}
public Map<String, String> getMappingTuple() {
return mappingTuple;
}
public void setMappingTuple(Map<String, String> mappingTuple) {
this.mappingTuple = mappingTuple;
}
}
static class CustomFlatMapFunction implements FlatMapFunction<XmlData, Tuple2<String,String>> {
@Override
public void flatMap(XmlData xmlData, Collector<Tuple2< String,String>> collector) throws Exception {
for(Map.Entry<String,String> entrySet : xmlData.getColumns().entrySet()){
collector.collect(new Tuple2<>(entrySet.getKey(), entrySet.getValue()));
}
}
}
static class CustomBroadcastingFunction extends BroadcastProcessFunction {
@Override
public void processElement(Object o, ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
}
@Override
public void processBroadcastElement(Object o, Context context, Collector collector) throws Exception {
}
}
}
Here's some example code of how to do this using a BroadcastStream. There's a subtle issue where the attribute remapping data might show up after one of the records. Normally you'd use a timer with state to hold onto any records that are missing remapping data, but in your case it's unclear whether a missing remapping is a "need to wait longer" or "no mapping exists". In any case, this should get you started...
private static MapStateDescriptor<String, String> REMAPPING_STATE = new MapStateDescriptor<>("remappings", String.class, String.class);
@Test
public void testUnkeyedStreamWithBroadcastStream() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
List<Tuple2<String, String>> attributeRemapping = new ArrayList<>();
attributeRemapping.add(new Tuple2<>("one", "1"));
attributeRemapping.add(new Tuple2<>("two", "2"));
attributeRemapping.add(new Tuple2<>("three", "3"));
attributeRemapping.add(new Tuple2<>("four", "4"));
attributeRemapping.add(new Tuple2<>("five", "5"));
attributeRemapping.add(new Tuple2<>("six", "6"));
BroadcastStream<Tuple2<String, String>> attributes = env.fromCollection(attributeRemapping)
.broadcast(REMAPPING_STATE);
List<Map<String, Integer>> xmlData = new ArrayList<>();
xmlData.add(makePOJO("one", 10));
xmlData.add(makePOJO("two", 20));
xmlData.add(makePOJO("three", 30));
xmlData.add(makePOJO("four", 40));
xmlData.add(makePOJO("five", 50));
DataStream<Map<String, Integer>> records = env.fromCollection(xmlData);
records.connect(attributes)
.process(new MyRemappingFunction())
.print();
env.execute();
}
private Map<String, Integer> makePOJO(String key, int value) {
Map<String, Integer> result = new HashMap<>();
result.put(key, value);
return result;
}
@SuppressWarnings("serial")
private static class MyRemappingFunction extends BroadcastProcessFunction<Map<String, Integer>, Tuple2<String, String>, Map<String, Integer>> {
@Override
public void processBroadcastElement(Tuple2<String, String> in, Context ctx, Collector<Map<String, Integer>> out) throws Exception {
ctx.getBroadcastState(REMAPPING_STATE).put(in.f0, in.f1);
}
@Override
public void processElement(Map<String, Integer> in, ReadOnlyContext ctx, Collector<Map<String, Integer>> out) throws Exception {
final ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(REMAPPING_STATE);
Map<String, Integer> result = new HashMap<>();
for (String key : in.keySet()) {
if (state.contains(key)) {
result.put(state.get(key), in.get(key));
} else {
result.put(key, in.get(key));
}
}
out.collect(result);
}
}