Search code examples
javatestingapache-flinkflink-streaming

Function executes succesfully in production but not in test in Flink


I have written an integration test in Flink 1.12.3, which tests the execute method in StreamingJob class. Surprisingly, this method outputs records to sink succesfully in production environment, but it doesn't output anything in local tests. How can I solve this and enable testing?

This may be related

    private static final DeviceIdSink deviceIdSink = new DeviceIdSink();
    
    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(1)
                            .setNumberTaskManagers(2)
                            .build());
    
    @Test
    public void testingAStreamingJob() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        List<JsonNode> events = getListFromResource("events.json");
        DataStream<JsonNode> testStream = env.fromCollection(events);

        StreamingJob job = new StreamingJob(env, Time.seconds(60),
                 testStream, deviceIdSink);


        job.execute();

        System.out.println(deviceIdSink.values);


    ```

Solution

  • Once the testStream source is exhausted, the job will terminate. So if you have any time-based windowing happening, you'll have pending results that never get emitted.

    I use a MockSource that doesn't terminate until the cancel() method is called, e.g.

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.flink.api.common.ExecutionConfig;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeutils.TypeSerializer;
    import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    import org.apache.flink.api.java.typeutils.TypeExtractor;
    import org.apache.flink.core.memory.DataInputView;
    import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * A very simple, non-parallel source based on a list of elements. You can specify a delay
     * for between each element that is emitted.
     * 
     * @param <T>
     */
    @SuppressWarnings("serial")
    public class MockSource<T> implements SourceFunction<T>, ResultTypeQueryable<T>, Serializable {
        
        private static final Logger LOGGER = LoggerFactory.getLogger(MockSource.class);
        
        private int listSize;
        private byte[] elementsSerialized;
        private TypeInformation<T> typeInfo;
        private TypeSerializer<T> serializer;
        private Time delay = null;
        
        private transient volatile boolean running;
    
        // Constructor for cases where you want an empty list as the source.
        public MockSource(TypeInformation<T> typeInfo) throws IOException {
            this(Collections.emptyList(), typeInfo);
        }
    
        @SuppressWarnings("unchecked")
        public MockSource(T... elements) throws IOException {
            this((List<T>) Arrays.asList(elements));
        }
    
        /**
         * Create a source from <data>, which cannot be empty (if so, use the other constructor that takes a typeInfo
         * argument.
         * 
         * @param data
         * @throws IOException
         */
        public MockSource(List<T> data) throws IOException {
            this(data, TypeExtractor.getForObject(data.get(0)));
        }
    
        public MockSource(List<T> data, TypeInformation<T> typeInfo) throws IOException {
            this.typeInfo = typeInfo;
            this.serializer = typeInfo.createSerializer(new ExecutionConfig());
    
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos);
    
            listSize = 0;
            try {
                for (T element : data) {
                    serializer.serialize(element, wrapper);
                    listSize++;
                }
            } catch (Exception e) {
                throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
            }
    
            this.elementsSerialized = baos.toByteArray();
        }
    
        public MockSource<T> setDelay(Time delay) {
            this.delay = delay;
            return this;
        }
        
        @Override
        public void run(SourceContext<T> ctx) throws Exception {
            running = true;
            Object lock = ctx.getCheckpointLock();
    
            ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
            final DataInputView input = new DataInputViewStreamWrapper(bais);
    
            int i = 0;
            while (running && (i < this.listSize)) {
                T next;
                try {
                    next = serializer.deserialize(input);
                } catch (Exception e) {
                    throw new IOException("Failed to deserialize an element from the source. "
                            + "If you are using user-defined serialization (Value and Writable types), check the "
                            + "serialization functions.\nSerializer is " + serializer, e);
                }
    
                synchronized (lock) {
                    ctx.collect(next);
                    i++;
                    
                    if (delay != null) {
                        LOGGER.debug("MockSource delaying for {}ms", delay.toMilliseconds());
                        
                        Thread.sleep(delay.toMilliseconds());
                    }
                }
            }
        }
    
        @Override
        public void cancel() {
            running = false;
        }
    
        @Override
        public TypeInformation<T> getProducedType() {
            return typeInfo;
        }
    }