Search code examples
google-cloud-dataflowapache-beamdataflow

Dataflow: DoFn to return KV after a Join


it is the first time for me dealing with a performance critic Dataflow job. When trying to fix the performance issue I'm having I then started to introduce this kind of pattern in order to try minimise the number of methods calls.

Code before

PCollection<KV<Long, TableRow> result = Join.leftOuterJoin("Joining AlfaCollection with BetaCollection", alfaCollection, betaCollection, Beta.newBuilder().build())
.apply(ParDo.of(ToTableRowParDo.builder().build()))
.apply("Keying via my_field", WithKeys.of((TableRow tr) -> tr.get("my_field")))

Code after

PCollection<KV<Long, TableRow> result = Join.leftOuterJoin("Joining AlfaCollection with BetaCollection", alfaCollection, betaCollection, Beta.newBuilder().build())
    .apply(ParDo.of(ToTableRowKVParDo.builder().build()))

where ToTableRowKVParDo is implemented like

@Builder
public class ToTableRowKVParDo extends DoFn<KV<Alfa, Beta>, KV<Long, TableRoW> {
    
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        KV<Alfa, Beta> kv = c.element().getValue();
        Alfa a = kv.getKey();
        TableRoW tr = new TableRow();
        tr.set(....);
        tr.set(....);

        
        c.output(KV.of(a.getKey(), tr));
    }
    
}

saying that my pipeline is still suffering of performance issues, my question for now is: is this considered a good approach/pattern or it is not? which version of the code is to be considered better than the other?


Solution

  • Dataflow will automatically fuse these steps together; it is generally best practice to use distinct Beam operations for distinct logical operations (just as one would with functions). It is rare for method calls to be the performance issue; instead I would look for things like reducing the data that needs to be shuffled (by filtering rows or columns if possible before the join).