Search code examples
javaiterationapache-flink

Transfering multiple DataSets to next iteration in Apache Flink


In the original Kmeans clustering example provided with Flink each point is assigned to a new centroid in each iteration and the information regarding which centroid the point was assigned to is not carried over in to the next iteration. My goal is to carry this information over to the next iteration.

The first solution I tried was to assign each point to a non-existing centroid with an ID of 0 before the loop and then update this DataSet troughout the iterations. This is the way I would have done it in a regular loop, however I now realize that using the iteration feature in Flink is not completely the same as using a regular loop. The code for this is shown below.

DataSet<Tuple2<Integer, Point>> clusteredPoints = nullClusteredPoints;

IterativeDataSet<Centroid> loop = centroids.iterate(iterations);

// Asssigning each point to the nearest centroid
clusteredPoints = clusteredPoints
        // compute closest centroid for each point
        .map(new SelectNearestCenter())
        .withBroadcastSet(loop, "centroids");

DataSet<Centroid> newCentroids = clusteredPoints
        // count and sum point coordinates for each centroid
        .map(new CountAppender())
        .groupBy(0).reduce(new CentroidAccumulator())
        // compute new centroids from point counts and coordinate sums
        .map(new CentroidAverager());

// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

My expectation here was that the DataSet clusteredPoints would be used in each iteration, and then after the last iteration this DataSet would consist of the final clustered points. However, when trying to execute this, the following exception occurs.

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

Another solution I tried was using delta iterations and putting the DataSet of points in the solution set in order to be fed to the next iteration. This did not work either as the only operations that is allowed on the solution set are Join and CoGroup, according to the exception below.

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Error: The only operations allowed on the solution set are Join and CoGroup.

A third solution that I tried was to read the DataSet of points from disk at the beginning of each iteration and write them to disk at the end of the iteration (would probably be highly inefficient). However, writing to disk returns a DataSink and therefore the first exception displayed above occur for this solution as well.

Are the better solutions that I could try? Or does Flink iterations not support a use-case like this?


Solution

  • Flink's iteration currently support only one moving dataset, such that it gets the nice runtime properties where all static data is kept in-memory and the moving dataset is streamed through. In theory, Flink could support more, but there are lots of cases where these nice properties cannot hold.

    In your case, you could resolve the issue by joining the two datasets into one centroidWithPoints = clusters, where for each centroid, you also store a list of points.

    Alternatively, you could used a tagged union, to combine both datasets into one and then split it up in the beginning of the next iteration.