Search code examples
google-cloud-dataflowdataflow

Is there a way to upload jars for a dataflow job so we don't have to serialize everything?


As an example, I remember in hadoop I could make classes serializable or give a path with the jars needed for my job. I had either option. I am wondering if this is true in a dataflow job such that I can have all the clients we have in jar files to be packaged for all the workers.

In our case, we have MicroserviceApi and a generated client, etc. and would prefer to output to that downstream microservice without having to make it serializable.

Is there a method to do this?


Solution

    1. First, let me clarify about serialization

    When you add implements Serializable to a class in Java, you make it such that object instances of that class can be serialized (not the class itself). The destination JVM needs to have access to the class to be able to understand the serialized instances that you send to it. So, in fact you always need to provide the JAR for it.

    Beam has code to automatically find all JARs in your classpath and upload them to Dataflow or whatever runner you're using, so if the JAR is in your classpath, then you don't need to worry about it (if you're using Maven/Gradle and specifying it as a dependency, then you're most likely fine).


    1. Now, how can I use a class in Beam if it's not serializable?

    In Beam, the more important part of it is to figure out where and when the different parts of the pipeline code will execute. Some things execute at pipeline construction time and some things execute at pipeline running time.

    Things that run at construction time

    • Constructors for all your classes (DoFns, PTransforms, etc)
    • The expand method of your PTransforms

    Things that run at executuion time

    • For your DoFns: ProcessElement, StartBundle, FinishBundle, Setup, TearDown methods.

    If your class does not implement serializable, but you want to access it at execution time, then you need to create it at execution time. So, suppose that you have a DoFn:

    class MyDoFnImplementation extends DoFn<String, String> {
    
      // All members of the object need to be serializable. String is easily serializable.
      String config;
      // Your MicroserviceApi is *not* serializable, so you can mark it as transient.
      // The transient keyword ensures that Java will ignore the member when serializing.
      transient MicroserviceApi client;
    
      public MyDoFnImplementation(String configuration) {
        // This code runs at *construction time*.
        // Anything you create here needs to be serialized and sent to the runner.
        this.config = configuration;
      }
    
      @ProcessElement
      public void process(ProcessContext c) {
        // This code runs at execution time. You can create your object here.
        // Add a null check to ensure it's only created once.
        // You can also create it at @Setup or @StartBundle.
        if (client == null) client = new MicroserviceApi(this.config);
      }
    }
    

    By ensuring that objects are created at execution time, you can avoid the need to make them serializable - but your configuration needs to be serializable.