Search code examples
google-cloud-dataflowapache-beam

Access elements of PCollectionView<List<Foo>> : Google Cloud Dataflow/Apache Beam


I have a PCollection which I want to pass as a side-input and access its elements in a ParDo.

I've created a PCollectionView of it as:

final PCollectionView<List<Foo>> view =
    myPCollection.apply(View.asList());

How to access the elements of it in a ParDo when passed as a side-input?

An example would really help.

Thank You


Solution

  • This snippet mainly comes from the Beam programming guide.

    final PCollectionView<List<Foo>> view =
                   myPCollection.apply(View.asList());
    
    
    PCollection<String> resultingPCollection =
    someOtherPCollection.apply(ParDo
        .of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              List<Foo> mySideInput = c.sideInput(view);
              // Do something with side input
            }
        }).withSideInputs(view)
    );
    

    If you don't want to use an anonymous DoFn, you can also pass the PCollectionView as part of its constructor and access it in the processElement function. Like so:

    final PCollectionView<List<Foo>> view =
               myPCollection.apply(View.asList());
    
    
    PCollection<String> resultingPCollection =
              someOtherPCollection.apply(ParDo
                  .of(new MyDoFn(view)).withSideInputs(view));
    
    class MyDoFn extends DoFn<String, String> {
      final PCollectionView<List<Foo>> view;
    
      MyDoFn(PCollectionView<List<Foo>> view) {
        this.view = view;
      }
    
      @ProcessElement
      public void processElement(ProcessContext c) {
        List<Foo> mySideInput = c.sideInput(this.view);
       // Do something with side input
      }
    }