I'm working on a dataflow with apache beam and I'm reading input files from the GCS bucket. I want to do some tasks after the pipeline execution like moving the input file into some other GCS location.
I have written the below code which is working fine from my local system but when I create a template and deploy it then It's not working.
PipelineResult result = p.run();
try {
result.getState();
result.waitUntilFinish();
if (result.getState().equals(PipelineResult.State.DONE)) {
// do some post processing task like I want to move input file.
// From one location to other location.
}
} catch (UnsupportedOperationException e) {
// do nothing
} catch (Exception e) {
e.printStackTrace();
}
The template simply captures the pipeline definition to run it at a later date; it does not actually re-execute the code (including anything that follows waitUntilFinish). To move the file you would have to either incorporate it as part of the pipeline itself (e.g. using a Wait Transform followed by a ParDo that does the action) or embed this into a larger orchestration system like airflow.