Search code examples
hdfsapache-flinkflink-streaming

Flink 1.11 StreamingFileSink flush record(s) early to in-progress file


We have a hourly bucketed StreamingFileSink (to HDFS) where records are relatively infrequent. Is there a way to configure Flink to flush records to in-progress file as soon as they arrive (less than 1 minute), instead of Flink keeping them in buffer?

The requirement is for successor data analysis process to read in-progress file near-real time.

I know how to shorten InactivityInterval but it results in too many small files in the end, which is undesirable.


Solution

  • May be you can look at the implementation of the write function. There are several implementations which will write the data to the hdfs file in real time.StreamingFileSink itself will not keep them in buffer, but there will be some buffers inside the FileOutputStream

    public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
    
    /**
     * Write a element to the part file.
     *
     * @param element the element to be written.
     * @param currentTime the writing time.
     * @throws IOException Thrown if writing the element fails.
     */
    void write(final IN element, final long currentTime) throws IOException;
    
    /**
     * @return The state of the current part file.
     * @throws IOException Thrown if persisting the part file fails.
     */
    InProgressFileRecoverable persist() throws IOException;
    
    /**
     * @return The state of the pending part file. {@link Bucket} uses this to commit the pending
     *     file.
     * @throws IOException Thrown if an I/O error occurs.
     */
    PendingFileRecoverable closeForCommit() throws IOException;
    
    /** Dispose the part file. */
    void dispose();
    
    // ------------------------------------------------------------------------
    
    /** A handle can be used to recover in-progress file.. */
    interface InProgressFileRecoverable extends PendingFileRecoverable {}
    
    /** The handle can be used to recover pending file. */
    interface PendingFileRecoverable {}
    

    }