Search code examples
apache-flinkflink-streaming

apache flink avro FileSink is struck at in-progress state for long time


I have below avro schema User.avsc

{
  "type": "record",
  "namespace": "com.myorg",
  "name": "User",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

The below java User.java class is generated from above User.avsc using avro-maven-plugin.

package com.myorg;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;

@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
    private static final long serialVersionUID = 8699049231783654635L;
    public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
    private static SpecificData MODEL$ = new SpecificData();
    private static final BinaryMessageEncoder<User> ENCODER;
    private static final BinaryMessageDecoder<User> DECODER;
    /** @deprecated */
    @Deprecated
    public long id;
    /** @deprecated */
    @Deprecated
    public String name;
    private static final DatumWriter<User> WRITER$;
    private static final DatumReader<User> READER$;

    public static Schema getClassSchema() {
        return SCHEMA$;
    }

    public static BinaryMessageDecoder<User> getDecoder() {
        return DECODER;
    }

    public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
        return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
    }

    public ByteBuffer toByteBuffer() throws IOException {
        return ENCODER.encode(this);
    }

    public static User fromByteBuffer(ByteBuffer b) throws IOException {
        return (User)DECODER.decode(b);
    }

    public User() {
    }

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Schema getSchema() {
        return SCHEMA$;
    }

    public Object get(int field$) {
        switch(field$) {
        case 0:
            return this.id;
        case 1:
            return this.name;
        default:
            throw new AvroRuntimeException("Bad index");
        }
    }

    public void put(int field$, Object value$) {
        switch(field$) {
        case 0:
            this.id = (Long)value$;
            break;
        case 1:
            this.name = (String)value$;
            break;
        default:
            throw new AvroRuntimeException("Bad index");
        }

    }

    public Long getId() {
        return this.id;
    }

    public void setId(Long value) {
        this.id = value;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String value) {
        this.name = value;
    }

    public void writeExternal(ObjectOutput out) throws IOException {
        WRITER$.write(this, SpecificData.getEncoder(out));
    }

    public void readExternal(ObjectInput in) throws IOException {
        READER$.read(this, SpecificData.getDecoder(in));
    }

    static {
        ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
        DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
        WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
        READER$ = MODEL$.createDatumReader(SCHEMA$);
    }

}

I want to write an instance of User SpecificRecord into File using apache flink`s FileSink.

Below is the program that I wrote -

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import java.util.Arrays;

public class AvroFileSinkApp {

    private static final String OUTPUT_PATH = "./il/";
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setParallelism(4);
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("il")
                .withPartSuffix(".avro")
                .build();

        DataStream<User> source = env.fromCollection(Arrays.asList(getUser(), getUser(), getUser(), getUser(), getUser(), getUser()));
        source.sinkTo(FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).withBucketCheckInterval(5000).withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(config).withBucketAssigner(new DateTimeBucketAssigner<>("yyyy/MM/dd/HH")).build());
        env.execute("FileSinkProgram");
        Thread.sleep(300000);
    }

    public static User getUser() {
        User u = new User();
        u.setId(1L);
        u.setName("raj");
        return u;
    }
}

I wrote this program using this and this as reference. The project is on github here.

When I run the program, the in progress files are getting created but not checkpointing and committing the temp files. I've added Thread.sleep(300000); but couldn't see the inprogress files to avro files.

in progress avro files I've awaited the main thread for an hour as well but no luck.

Any idea what is stopping in-progress files moving to finished state?


Solution

  • This problem is mainly because Source is a BOUNDED Source. The execution of the entire Flink Job is over before the Checkpoint has been executed.

    You can refer to the following example to generate User records instead of fromCollection

        /** Data-generating source function. */
        public static final class Generator
                implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {
    
            private static final long serialVersionUID = -2819385275681175792L;
    
            private final int numKeys;
            private final int idlenessMs;
            private final int recordsToEmit;
    
            private volatile int numRecordsEmitted = 0;
            private volatile boolean canceled = false;
    
            private ListState<Integer> state = null;
    
            Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
                this.numKeys = numKeys;
                this.idlenessMs = idlenessMs;
    
                this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
            }
    
            @Override
            public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
                while (numRecordsEmitted < recordsToEmit) {
                    synchronized (ctx.getCheckpointLock()) {
                        for (int i = 0; i < numKeys; i++) {
                            ctx.collect(Tuple2.of(i, numRecordsEmitted));
                            numRecordsEmitted++;
                        }
                    }
                    Thread.sleep(idlenessMs);
                }
    
                while (!canceled) {
                    Thread.sleep(50);
                }
            }
    
            @Override
            public void cancel() {
                canceled = true;
            }
    
            @Override
            public void initializeState(FunctionInitializationContext context) throws Exception {
                state =
                        context.getOperatorStateStore()
                                .getListState(
                                        new ListStateDescriptor<Integer>(
                                                "state", IntSerializer.INSTANCE));
    
                for (Integer i : state.get()) {
                    numRecordsEmitted += i;
                }
            }
    
            @Override
            public void snapshotState(FunctionSnapshotContext context) throws Exception {
                state.clear();
                state.add(numRecordsEmitted);
            }
        }
    }