Search code examples
google-cloud-dataflowapache-beamdataflow

Access sideinput inside ParDo


I am new to apache beam, and I am doing some investigation on using sideinput for one of our usecases. Below is the code.

PipelineOptions options =
            PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    final List<String> sideInput = Arrays.asList("1", "2", "3", "4");
    final List<String> input = Arrays.asList("a", "b", "c", "d");
    PCollectionView<List<String>> sideinput =
            pipeline.apply("readInput", Create.of(sideInput)).apply(View.asList());
    pipeline.apply("read", Create.of(input))
            .apply("process", ParDo.of(new DoFn<String, String>() {
                @ProcessElement public void process(ProcessContext pc) {
                    System.out.println("processing element:" + pc.element());
                    List<String> list = pc.sideInput(sideinput);
                    for (String element : list) {
                        System.out.print(element);
                    }
                    System.out.println("");
                }
            }).withSideInputs(sideinput));
    pipeline.run();

I am expecting it prints out all the sideinput elements after each element, e.g

processing element:d
1234
processing element:c
1234
processing element:a
1234
processing element:b
1234

However the results are different each time:

processing element:d
processing element:a
processing element:c
processing element:b
44441113312
2
32
32

Or

processing element:c
processing element:d
processing element:b
processing element:a
444422233211
31

31

Solution

  • It's rather expected since in distributed environment there is no guarantee in the order of input elements processing and the order of aggregated system output. You may want to concatenate the main element and side input elements and write it out in one shot to have what you expect as an output.