Search code examples
apache-flinkflink-streaming

Reduce function behaviour in keyed streams


For one of our use cases we need to redo some calculations based on the changes in a file and then broadcast the result of this file so that we can use it in the other stream.

The lifecycle of the program, pretty much looks like this:

Datastream 1: Monitored file -> detect some changes -> reprocess all elements in the file -> calculate one result -> broadcast

Datastream 2: Some transformation -> do something for every element in DS2 while using all present broadcasted elements (some data loss tolerable in the broadcasted elements for a while)

I will give some code examples to explain better what the problem is:

So this is DS1: Mapping every element, sending them to a reducer and then calculating total

env.readFile(format, clientPath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval)
    .map(new Adder())
    .keyBy(Map::size)
    .reduce(new Reducer());

This is the mapping phase, it simply creates a hashmap from a line

public static class Adder extends RichMapFunction<String, Map<String, String>> {
  private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> map(String string) throws Exception {
    String[] strings = string.split("=");
    HashMap<String, String> hashMap = new HashMap<>();
    hashMap.put(strings[0], strings[1]);
    return hashMap;
  }
}

This is the last step, reducer. Takes all reduced elements coming from mappers and then returns the total, a single hashmap

public static class Reducer extends RichReduceFunction<Map<String, String>> {
  private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {
    stringStringMap.putAll(t1);
    return stringStringMap;
  }
}

And then DS1 is broadcasted like the following code piece.

MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("Brodcasted map state", Types.STRING, Types.STRING);
BroadcastStream<Map<String, String>> broadcastedProperties =  clientProperties.broadcast(descriptor); 
ds2.connect(broadcastedProperties).process(new EventListener(properties));

Using the following elements in given time

Time    Document
T1      K1=V1, K2=V2
T2      K2=V2
T3      K3=V3, K1=V4

When I am running our program what I am expecting is this:

Time    Broadcasted Elements
T1      K1=V1, K2=V2
T2      K2=V2
T3      K3=V3, K1=V4

What I am seeing is this:

Time    Broadcasted Elements
T1      K1=V1, K2=V2
T2      K1=V1, K2=V2
T3      K1=V4, K2=V2, K3=V3

What I did to overcome this problem was simply take a window on the data stream and use an aggregate function with a accumulator instead of a reducer but I would prefer to go with the non-windowed approach.

I did some debugging, and what I have come to realized is that, even though in the map phase it is mapping only available elements in that time, in the reduce phase it is reducing based on the previous state ( by that I mean result of time – 1 ) + all elements at that point. I find it pretty odd to have an invisible state in the reduce phase. From my point of view it should be only based from the elements that are directly coming from mappers. Maybe my understanding of reduce in Flink is wrong but I would love to get some clarifications about it.


Solution

  • Yes, when any of Flink's built-in aggregators, e.g., sum, max, reduce, etc., is applied to a stream, it aggregates the entire stream, in an incremental, stateful way. Or more precisely, this is done on KeyedStreams, and the aggregation is done on a key-by-key basis, but in an ongoing, unbounded way. For example, if you were using sum() on the stream of integers 1, 2, 3, 4, 5, ... then sum() would produce the stream 1, 3, 6, 10, 15, ... . In your case reduce() will produce an ever-updating stream that will include more and more key/value pairs.

    If you were to key the stream by time, then you should get the results you are looking for, but the keyed state would still be held forever, which would be probably be problematic. I suggest you either use the window API, or something like a RichFlatMap or a ProcessFunction, where you can manage the state directly.