In my dataflow job, I need to initialize a Config factory and log certain messages in an audit log before actual processing begins.
I have placed the Config factory initialization code + audit logging in a parent class PlatformInitializer
and extending that in my Main Pipeline class.
public class CustomJob extends PlatformInitializer implements Serializable {
public static void main(String[] args) throws PropertyVetoException {
CustomJob myCustomjob = new CustomJob();
// Initialize config factories
myCustomjob.initialize();
// trigger dataflow job
myCustomjob.parallelRead(args);
}
as a result, I had to also implement Serializable interface in my Pipeline class because beam was throwing error - java.io.NotSerializableException: org.devoteam.CustomJob
Inside PlatformInitializer, I have an initilize() method that contains initialization logic for config factory and also log some initial audit messages.
public class PlatformInitializer {
public void initialize() {
// Configfactory factory = new Configfactory()
// CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
}
}
My question is - is this right way to invoke some code that needs to be called before pipeline begins execution?
If you need the initialized object at runtime (not at the pipeline construction time), you should move your initialization logic to a Beam DoFn. DoFn
has a number of method annotations that could be used to denote methods that should be executed in different lifecycle phases. Setup
and StartBundle
annotations might be useful for your use-case. See here for more details.