I'm trying to convert a DataStream of an object A to list of Object B. As shown in the example below, I am reading DataStream from flink consumer and I need to convert to DataStream so I can run some filters and aggregations over timeWindow on MappedMetric objects. One single LogEvent may result in list of MappedMetric object so if I use MapFunction, result would be DataStream<List>. However, I think that aggregation can't be run on DataStream<List>. Any help is highly appreciated. Thanks in advance.
// Input Object
public class LogEvent {
private String id;
private long timestamp;
private List<LogMessage> message;
}
public class LogMessage {
private String accountId;
private List<Metric> metrics;
}
public class Metric {
private String name;
private double value;
}
// Should be transformed to
public class MappedMetric {
private String accountId;
private String name;
private double value;
private long timestamp;
}
final DataStream<LogEvent> inputDataStream = **read from Flink consumer**
final DataStream<MappedMetric> aggregatedMetrics = inputDataStream
.map(**SomeMapFunction**)
.keyBy(**SomeKey**)
You want to use a FlatMap
function, which can generate multiple results for a single input. Each result is a single MappedMetric
record, not a list.