Search code examples
javagoogle-cloud-dataflowdataflow

How to do post-processing task in dataflow ? Post-processing means after execution of pipeline


I'm working on a dataflow with apache beam and I'm reading input files from the GCS bucket. I want to do some tasks after the pipeline execution like moving the input file into some other GCS location.

I have written the below code which is working fine from my local system but when I create a template and deploy it then It's not working.

 PipelineResult result = p.run();

    try {
        result.getState();
        result.waitUntilFinish(); 
        if (result.getState().equals(PipelineResult.State.DONE)) {
            // do some post processing task like I want to move input file.
            // From one location to other location.
        }

    } catch (UnsupportedOperationException e) {
        // do nothing
    } catch (Exception e) {
        e.printStackTrace();
    }

Solution

  • The template simply captures the pipeline definition to run it at a later date; it does not actually re-execute the code (including anything that follows waitUntilFinish). To move the file you would have to either incorporate it as part of the pipeline itself (e.g. using a Wait Transform followed by a ParDo that does the action) or embed this into a larger orchestration system like airflow.