I have a general question on side inputs and broadcasting in the context of Apache Beam
. Does any additional variables, lists, maps that are need for computation during processElement
, need to be passed as side input? Is it ok if they are passed as normal constructor arguments for the DoFn
? For example, what if I have some fixed (not computed) values variables (constants, like start date, end date) that I want to make use of during the per element computation of processElement
. Now, I can make singleton PCollectionView
s out of each of those variables separately and pass them to the DoFn
constructor as side input. However, instead of doing that, can I not just pass each of those constants as normal constructor arguments to the DoFn
? Am I missing anything subtle here?
In terms of code, when should I do:
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
// these are singleton views
private final PCollectionView<LocalDateTime> dateStartView;
private final PCollectionView<LocalDateTime> dateEndView;
public MyFilter(PCollectionView<LocalDateTime> dateStartView,
PCollectionView<LocalDateTime> dateEndView){
this.dateStartView = dateStartView;
this.dateEndView = dateEndView;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// extract date values from the singleton views here and use them
As opposed to :
public static class MyFilter extends DoFn<KV<String, Iterable<MyData>> {
private final LocalDateTime dateStart;
private final LocalDateTime dateEnd;
public MyFilter(LocalDateTime dateStart,
LocalDateTime dateEnd){
this.dateStart = dateStart;
this.dateEnd = dateEnd;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception{
// use the passed in date values directly here
Notice that in these examples, startDate
and endDate
are fixed values and not the dynamic results of any previous computation of the pipeline.
When you call something like pipeline.apply(ParDo.of(new MyFilter(...))
the DoFn
gets instantiated in the main
program that you use to start the pipeline. It then gets serialized and passed to the runner for execution. Runner then decides where to execute it, e.g. on a fleet of a 100 VMs each of which will receive its own copy of the code and serialized data. If the member variables are serializable and you don't mutate them at execution time, it should be fine (link, link), the DoFn
will get deserialized on each node with all the fields populated, and will get executed as expected. However you don't control the number of instances or basically their lifecycle (to some extent), so mutate them at your own risk.
The benefit of PCollections
and side inputs is that you are not limited to static values, so for couple of simple unmutable values you should be fine .