I have this pipeline : KafkaProducer -> Topic1 -> FlinkConsumer -> Topic2 -> KafkaConsumer
I'm trying to extract the timing of the record for each stage of the pipeline:
In Flink java application I did something like this :
inputstream.
// To calculate flink input time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("source time : %d\n",System.nanoTime());
writeDataLineByLine("flinkinput_data.csv",-1,System.nanoTime());
return s;
}
}).
// Process
map(new MapFunction<String, String>() {
@Override
public String map(String record) throws InterruptedException {
for(int i=0;i<2;i++)
Thread.sleep(1);
return record + " mapped";
}
}).
// To calculate flink output time
map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.printf("sink time : %d\n",System.nanoTime());
writeDataLineByLine("flinkoutput_data.csv",-1,System.nanoTime());
return s;
}
}).
addSink(producer);
While this is working in mini-cluster in Intellij, it doesn't work on a standalone cluster. Can someone plz explain to me why the print and write to csv lines are ignored?
Whatever the task managers write to stdout goes into files in Flink's log directory on each of the task manager nodes.