Search code examples
apache-flinkflink-streaming

Flink RichParallelSourceFunction - close() vs cancel()


I am implementing a RichParallelSourceFunction which reads files over SFTP. RichParallelSourceFunction inherits cancel() from SourceFunction and close() from RichFunction(). As far as I understand it, both cancel() and close() are invoked before the source is teared down. So in both of them I have to add logic for stopping the endless loop which reads files.

When I set the parallelism of the source to 1 and I run the Flink job from the IDE, Flink runtime invokes stop() right after it invokes start() and the whole job is stopped. I didn't expect this.

When I set the parallelism of the source to 1 and I run the Flink job in a cluster, the job runs as usual. If I leave the parallelism of the source to the default (in my case 4), the job runs as usual.

Using Flink 1.7.


public class SftpSource<TYPE_OF_RECORD>
    extends RichParallelSourceFunction<TYPE_OF_RECORD>
{
    private final SftpConnection mConnection;
    private boolean mSourceIsRunning;

    @Override 
    public void open(Configuration parameters) throws Exception
    {
        mConnection.open();
    }

    @Override 
    public void close()
    {
        mSourceIsRunning = false;
    }


    @Override
    public void run(SourceContext<TYPE_OF_RECORD> aContext)
    {
        while (mSourceIsRunning)
        {
            synchronized ( aContext.getCheckpointLock() )
            {
                // use mConnection
                // aContext.collect() ...
            }

            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException ie)
            {
                mLogger.warn("Thread error: {}", ie.getMessage() );
            }
        }

        mConnection.close();
    }


    @Override
    public void cancel()
    {
        mSourceIsRunning = false;
    }
}

So I have workarounds and the question is more about the theory. Why is close() invoked if parallelism is 1 and the job is run from the IDE (i.e. from the command line)? Also, do close() and cancel() do the same in a RichParallelSourceFunction?


Solution

  • Why is close() invoked if parallelism is 1 and the job is run from the IDE.

    close is called after the last call to the main working methods (e.g. map or join). This method can be used for clean up work. It will be called independent of the number defined in parallelism.

    Also, do close() and cancel() do the same in a RichParallelSourceFunction?

    They aren't the same thing, take a look at how it's described.

    Cancels the source. Most sources will have a while loop inside the run(SourceContext) method. The implementation needs to ensure that the source will break out of that loop after this method is called.
    

    https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html#cancel--

    The following link may help you to understand the task lifecycle: https://ci.apache.org/projects/flink/flink-docs-stable/internals/task_lifecycle.html#operator-lifecycle-in-a-nutshell