it is the first time for me dealing with a performance critic Dataflow job. When trying to fix the performance issue I'm having I then started to introduce this kind of pattern in order to try minimise the number of methods calls.
Code before
PCollection<KV<Long, TableRow> result = Join.leftOuterJoin("Joining AlfaCollection with BetaCollection", alfaCollection, betaCollection, Beta.newBuilder().build())
.apply(ParDo.of(ToTableRowParDo.builder().build()))
.apply("Keying via my_field", WithKeys.of((TableRow tr) -> tr.get("my_field")))
Code after
PCollection<KV<Long, TableRow> result = Join.leftOuterJoin("Joining AlfaCollection with BetaCollection", alfaCollection, betaCollection, Beta.newBuilder().build())
.apply(ParDo.of(ToTableRowKVParDo.builder().build()))
where ToTableRowKVParDo is implemented like
@Builder
public class ToTableRowKVParDo extends DoFn<KV<Alfa, Beta>, KV<Long, TableRoW> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
KV<Alfa, Beta> kv = c.element().getValue();
Alfa a = kv.getKey();
TableRoW tr = new TableRow();
tr.set(....);
tr.set(....);
c.output(KV.of(a.getKey(), tr));
}
}
saying that my pipeline is still suffering of performance issues, my question for now is: is this considered a good approach/pattern or it is not? which version of the code is to be considered better than the other?
Dataflow will automatically fuse these steps together; it is generally best practice to use distinct Beam operations for distinct logical operations (just as one would with functions). It is rare for method calls to be the performance issue; instead I would look for things like reducing the data that needs to be shuffled (by filtering rows or columns if possible before the join).