Search code examples
javaapache-flink

Computing variables before each iteration using the DataSet API in Apache Flink


I am working with the clustering example provided with Flink (Kmeans) and trying to extend the functionality of it. The goal is to reduce the number of distance computations by computing a multidimensional-array consisting of the distances between each centroid, such that the distances can be found in a double[][] array. This array must be computed at the beginning of each iteration and broadcasted, when the points are assigned clusters.

I have tried the following:

public static final class computeCentroidInterDistance implements MapFunction<Tuple2<Centroid, Centroid>, Tuple3<Integer, Integer, Double>> {

    @Override
    public Tuple3<Integer, Integer, Double> map(Tuple2<Centroid, Centroid> centroid) throws Exception {
        return new Tuple3<>(centroid.f0.id, centroid.f1.id, centroid.f0.euclideanDistance(centroid.f1));
    }
}

DataSet<Centroid> centroids = getCentroidDataSet(..);

DataSet<Tuple3<Integer, Integer, Double>> distances = centroids
    .crossWithTiny(centroids)
    .map(new computeCentroidInterDistance());

However, I dont see how the distances DataSet can be used for my use-case as this is not returned in any specific order that can be used to lookup the distances between two different centroids. Is there a better way of doing this?


Solution

  • DataSets are inherently unordered and sharded, both which are not suited for your use case.

    What you want to do is to first collect all centroids in one method invocation.

    DataSet<double[][]> matrix = centroids.reduceGroup(...)
    

    Within the reduceGroup you have access to all elements and you can perform the calculation. The output should be your double[][] matrix.

    The matrix can then be distributed with a broadcast.