Search code examples
google-cloud-dataflowgoogle-cloud-functionsgoogle-api-nodejs-client

ValueProvider Issue


I am trying to get the value of a property that is passed from a cloud function to a dataflow template. I am getting errors because the value being passed is a wrapper, and using the .get() method fails during the compile. with this error An exception occurred while executing the Java class. null: InvocationTargetException: Not called from a runtime context.

public interface MyOptions extends DataflowPipelineOptions {
...
@Description("schema of csv file")
ValueProvider<String> getHeader();
void setHeader(ValueProvider<String> header);
...
}

public static void main(String[] args) throws IOException {
...
    List<String> sideInputColumns = Arrays.asList(options.getHeader().get().split(","));
...
    //ultimately use the getHeaders as side inputs
    PCollection<String> input = p.apply(Create.of(sideInputColumns));
    final PCollectionView<List<String>> finalColumnView = input.apply(View.asList());
}

How do I extract the value from the ValueProvider type?


Solution

  • The value of a ValueProvider is not available during pipeline construction. As such, you need to organize your pipeline so that it always has the same structure, and serializes the ValueProvider. At runtime, the individual transforms within your pipeline can inspect the value to determine how to operate.

    Based on your example, you may need to do something like the following. It creates a single element, and then uses a DoFn that is evaluated at runtime to expand the headers:

    public static class HeaderDoFn extends DoFn<String, String> {
      private final ValueProvider<String> header;
      public HeaderDoFn(ValueProvider<String> header) {
        this.header = header;
      }
    
      @ProcessElement
      public void processElement(ProcessContext c) {
        // Ignore input element -- there should be exactly one
        for (String column : this.header().get().split(",")) {
          c.output(column);
        }
      }
    }
    
    public static void main(String[] args) throws IOException {
      PCollection<String> input = p
        .apply(Create.of("one")) // create a single element
        .apply(ParDo.of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
          }
        });
    
      // Note that the order of this list is not guaranteed. 
      final PCollectionView<List<String>> finalColumnView = 
        input.apply(View.asList());        
    }
    

    Another option would be to use a NestedValueProvider to create a ValueProvider<List<String>> from the option, and pass that ValueProvider<List<String>> to the necessary DoFns rather than using a side input.