I am newbie to flink apologize if my understanding is wrong i am building a dataflow application and the flow contains multiple data streams which check if the required fields are present in the incoming DataStream or not. My application validate the incoming data and if the data is validated successfully it should append the data to file in the given if it is already existing. I am trying to simulate if any exception happens in one DataStream other data streams should not get impacted for that i am explicitly throwing an exception in one of the flow. In the below example for simplicity i am using windows text file to append data
Note: My flow don't have states since i don't have any thing to store in state
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
output2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
try {
File myObj = new File("C://flinkOutput//filename.txt");
if (myObj.createNewFile()) {
System.out.println("File created: " + myObj.getName());
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
} else {
System.out.println("File already exists.");
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
}
} catch (IOException e) {
System.out.println("An error occurred.");
e.printStackTrace();
}
}
});
env.execute();
}
I have few doubts as below
When i am throwing exception in output1 stream the second flow output2 is running even after encountering the exception and writing data to the file in my local but when i check the file the output as below
hello world
hello world
hello world
hello world
As per my understanding from flink documentation if i use the checkpointing mode as EXACTLY_ONCE it should not write the data to file not more than one time as the process is already completed and written data to file. But its not happening in my case and i am not getting if i am doing anything wrong
Please help me to clear my doubts on checkpointing and how can i achieve the EXACTLY_ONCE mechanism i read about TWO_PHASE_COMMIT in flink but i didn't get any example on how to implement it.
As suggested by @Mikalai Lushchytski i implemented StreamingSinkFunction below
With StreamingSinkFunction
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
String outputPath = "C://flinkCheckpoint";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1)
.build())
.build();
output2.addSink(sink);
});
env.execute();
}
But when i check the Checkpoint folder i can see it created four part files with in progress as below
Is there anything i am doing because of that its creating multipart files?
In order to guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs to take part in the checkpointing mechanism (as well as the data source).
If you are going to write the data to a file, then you can use a StreamingFileSink, which emits its input elements to FileSystem
files within buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics out-of-the box.
If you are going to implement your own sink, then the sink function must implement the CheckpointedFunction
interface and properly implement snapshotState(FunctionSnapshotContext context)
method called when a snapshot for a checkpoint is requested and flushing the current application state. In addition I would recommend implementing the CheckpointListener
interface to be notified once a distributed checkpoint has been completed.
Flink already provides an abstract TwoPhaseCommitSinkFunction
, which is a recommended base class for all of the SinkFunction
that intend to implement exactly-once semantic. It does that by implementing two phase commit algorithm on top of the CheckpointedFunction
and
CheckpointListener
. As an example, you can have a look at FlinkKafkaProducer.java source code.