Search code examples
google-cloud-dataflowapache-beamapache-beam-io

Apache beam Initializer


In my dataflow job, I need to initialize a Config factory and log certain messages in an audit log before actual processing begins.

I have placed the Config factory initialization code + audit logging in a parent class PlatformInitializer and extending that in my Main Pipeline class.

public class CustomJob extends PlatformInitializer implements Serializable {
    public static void main(String[] args) throws PropertyVetoException {
        CustomJob myCustomjob = new CustomJob();

        // Initialize config factories
        myCustomjob.initialize();

        // trigger dataflow job
        myCustomjob.parallelRead(args);
    }

as a result, I had to also implement Serializable interface in my Pipeline class because beam was throwing error - java.io.NotSerializableException: org.devoteam.CustomJob

Inside PlatformInitializer, I have an initilize() method that contains initialization logic for config factory and also log some initial audit messages.

public class PlatformInitializer {

    public void initialize() {
        // Configfactory factory = new Configfactory()
        // CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
    }
}

My question is - is this right way to invoke some code that needs to be called before pipeline begins execution?


Solution

  • If you need the initialized object at runtime (not at the pipeline construction time), you should move your initialization logic to a Beam DoFn. DoFn has a number of method annotations that could be used to denote methods that should be executed in different lifecycle phases. Setup and StartBundle annotations might be useful for your use-case. See here for more details.