Search code examples
google-cloud-dataflowapache-beamdataflow

How can I inject with Guice my api into dataflow jobs without needed to be serializable?


This question is a follow on after such a great answer Is there a way to upload jars for a dataflow job so we don't have to serialize everything?

This made me realize 'ok, what I want is injection with no serialization so that I can mock and test'.

Our current method requires our apis/mocks to be serialiable BUT THEN, I have to put static fields in the mock because it gets serialized and deserialized creating a new instance that dataflow uses.

My colleague pointed out that perhaps this needs to be a sink and that is treated differently? <- We may try that later and update but we are not sure right now.

My desire is from the top to replace the apis with mocks during testing. Does someone have an example for this?

Here is our bootstrap code that does not know if it is in production or inside a feature test. We test end to end results with no apache beam imports in our tests meaning we swap to any tech if we want to pivot and keep all our tests. Not only that, we catch way more integration bugs and can refactor without rewriting tests since the contracts we test are customer ones we can't easily change.

public class App {

    private Pipeline pipeline;
    private RosterFileTransform transform;

    @Inject
    public App(Pipeline pipeline, RosterFileTransform transform) {
        this.pipeline = pipeline;
        this.transform = transform;
    }


    public void start() {
        pipeline.apply(transform);
        pipeline.run();
    }
}

Notice that everything we do is Guice Injection based so the Pipeline may be direct runner or not. I may need to modify this class to pass things through :( but anything that works for now would be great.

The function I am trying to get our api(and mock and impl to) with no serialization is thus

private class ValidRecordPublisher extends DoFn<Validated<PractitionerDataRecord>, String> {
    @ProcessElement
    public void processElement(@Element Validated<PractitionerDataRecord>element) {
        microServiceApi.writeRecord(element.getValue);
    }
}

I am not sure how to pass in microServiceApi in a way that avoid serialization. I would be ok with delayed creation as well after deserialization using guice Provider provider; with provider.get() if there is a solution there too.


Solution

  • Solved in such a way that mocks no longer need static or serialization anymore by one since glass bridging the world of dataflow(in prod and in test) like so

    NOTE: There is additional magic-ness we have in our company that passes through headers from service to service and through dataflow and that is some of it in there which you can ignore(ie. the RouterRequest request = Current.request();). so for anyone else, they will have to pass in projectId into getInstance each time.

    public abstract class DataflowClientFactory implements Serializable {
        private static final Logger log = LoggerFactory.getLogger(DataflowClientFactory.class);
    
        public static final String PROJECT_KEY = "projectKey";
        private transient static Injector injector;
        private transient static Module overrides;
    
        private static int counter = 0;
    
        public DataflowClientFactory() {
            counter++;
            log.info("creating again(usually due to deserialization). counter="+counter);
        }
    
        public static void injectOverrides(Module dfOverrides) {
            overrides = dfOverrides;
        }
    
        private synchronized void initialize(String project) {
            if(injector != null)
                return;
    
            /********************************************
             * The hardest part is this piece since this is specific to each Dataflow
             * so each project subclasses DataflowClientFactory
             * This solution is the best ONLY in the fact of time crunch and it works
             * decently for end to end testing without developers needing fancy
             * wrappers around mocks anymore.
             ***/
            Module module = loadProjectModule();
    
            Module modules = Modules.combine(module, new OrderlyDataflowModule(project));
            if(overrides != null) {
                modules = Modules.override(modules).with(overrides);
            }
    
            injector = Guice.createInjector(modules);
        }
    
        protected abstract Module loadProjectModule();
    
        public <T> T getInstance(Class<T> clazz) {
            if(!Current.isContextSet()) {
                throw new IllegalStateException("Someone on the stack is extending DoFn instead of OrderlyDoFn so you need to fix that first");
            }
            RouterRequest request = Current.request();
            String project = (String)request.requestState.get(PROJECT_KEY);
    
            initialize(project);
            return injector.getInstance(clazz);
        }
    
    }