Search code examples
pythonapache-flinkflink-streaming

Flink Streaming Python API - reduce() produces incremental results instead of a final value


I am trying to implement Kmeans Cluctering algorithm on Flink using the Python API for streaming. I am doing a key_by based on the 0th index and then trying to reduce() on each group to get a sort of count aggregate.

class CentroidAccumulator(ReduceFunction):                                                                                                                                       
    def reduce(self, val1, val2):                                                                                                                                                
        id1, point1, count1 =  val1                                                                                                                                              
        id2, point2, count2 =  val2                                                                                                                                              
        return (id1, point1.add(point2), count1+count2)   

class Selector(KeySelector):                                                                                                                                                     
    def getKey(self, value):                                                                                                                                                     
        return value[0]   


nearest_points = points \                                                                                                                                                
                .map(SelectNearestPoint(centroids)) \                                                                                                                            
                .key_by(Selector()).reduce(CentroidAccumulator()) 
nearest_points.write_as_text("output.txt")

Expected Result:

(1, <tuple>, count)
(2, <tuple>, count)
(3, <tuple>, count)
(4, <tuple>, count)

Actual Result:

I get the output of all the iterations written to the file(I have 40 points in the sample I am testing out with and hence the output has 40 lines like this)

(1, <kmeans_clustering.Point instance at 0x2>, 1)                                                                                                                                
(3, <kmeans_clustering.Point instance at 0x3>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x4>, 1)                                                                                                                                
(2, <kmeans_clustering.Point instance at 0x5>, 2)                                                                                                                                
.
.
.                                                                                                                
(2, <kmeans_clustering.Point instance at 0x20>, 13)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x21>, 14)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x22>, 10)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x23>, 4)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x24>, 15)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x25>, 16)                                                                                                                              
(1, <kmeans_clustering.Point instance at 0x26>, 11)                                                                                                                              
(4, <kmeans_clustering.Point instance at 0x27>, 5)                                                                                                                               
(2, <kmeans_clustering.Point instance at 0x28>, 17)                                                                                                                              
(2, <kmeans_clustering.Point instance at 0x29>, 18) 

The thing is it is reducing alright, but I want to get only the last value of the reduce transformation for each group(which is how reduce should work to my understanding). What am I doing wrong?


Solution

  • You're not doing anything wrong; this is the expected behavior for a streaming reduce function. Conceptually, a datastream is an endless flow of data -- and so it doesn't make sense to "wait until the end" to produce a result. The standard behavior for streaming programs is to produce a result for every event.

    Of course, this can be a bit inconvenient. If you only want to see the final result, then there has to be some way to indicate that the end has come. With batch programs this comes naturally. With streaming applications, finite data sources send a watermark with the value MAX_WATERMARK that can be used to detect that the input has reached its end -- you could catch this in a ProcessFunction with an event-time timer, but that's a somewhat complex solution. You could also use windows to implement a sort of workaround.