Search code examples
javajunitapache-flinkflink-streaming

Why is Apache Flink droping the event from datastream?


In the following unit test case, some event specified by numberOfElements is generated and fed as a data stream. This unit cases randomly fails at the line.

assertEquals(numberOfElements, CollectSink.values.size());

Any explanation why Apache Flink is skipping the events.

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;

public class FlinkTest {

StreamExecutionEnvironment env;

@Before
public void setup() {
    env = StreamExecutionEnvironment.createLocalEnvironment();
}

@Test
public void testStream1() throws Exception {
    testStream();
}

@Test
public void testStream2() throws Exception {
    testStream();
}

@Test
public void testStream3() throws Exception {
    testStream();
}

@Test
public void testStream4() throws Exception {
    testStream();
}


@Test
public void testStream() throws Exception {

    final int numberOfElements = 50;

    DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
    CollectSink.values.clear();
    tupleStream.addSink(new CollectSink());
    env.execute();
    sleep(2000);

    assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
    assertEquals(numberOfElements, CollectSink.values.size());
}


public static List<Tuple2<String, Integer>> getCollectionOfBucketImps(int numberOfElements) throws InterruptedException {
    List<Tuple2<String, Integer>> records = new ArrayList<>();
    for (int i = 0; i < numberOfElements; i++) {
        records.add(new Tuple2<>(Integer.toString(i % 10), i));
    }
    return records;
}

// create a testing sink
private static class CollectSink implements SinkFunction<Tuple2<String, Integer>> {

    public static final List<Tuple2<String, Integer>> values = new ArrayList<>();

    @Override
    public synchronized void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        values.add(value);
    }
 }
}

For examples either of testStreamX case fails randomly.

Context: The code runs with 8 as parallelism setu since the cpu where it runs has 8 Cores


Solution

  • I don't know the paralellism of your jobs (i suppose that is the max that Flink can assign). Looks like you can have a Race condition on the add value of your sink.

    Solution

    I have runned your example code, setting the environment parallelism to 1 and everything works fine. The documentation examples about testing uses this solution link to documentation.

    @Before
    public void setup() {
        env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);
    }
    

    Even Better

    You can set the parallelism to 1 only on the sink operator and mantain the parallelism of the rest of the pipeline. In the following example, i added an extra map function with a forced parallelism of 8 for tha map operator.

    public void testStream() throws Exception {
    
        final int numberOfElements = 50;
    
        DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
        CollectSink.values.clear();
        tupleStream
                .map(new MapFunction<Tuple2<String,Integer>, Tuple2<String,Integer>>() {
                    @Override
                    public Tuple2<String,Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
    
                        stringIntegerTuple2.f0 += "- concat something";
    
                        return stringIntegerTuple2;
                    }
                }).setParallelism(8)
                .addSink(new CollectSink()).setParallelism(1);
        env.execute();
        sleep(2000);
    
        assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
        assertEquals(numberOfElements, CollectSink.values.size());
    }