Search code examples
javagoogle-cloud-dataflowapache-beam

How to apply PTransform to PCollection conditionally?


I have a PCollection and want to apply a custom PTransform if a condition is verified (that condition doesn't depends on the Pcollectioncontent).

Example: I have logs, and if a date is provided in PipelineOptions, I want to filter on that date.

Right now, the best solution I have is :

// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
    logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...

It works but I don't like to reassign logs. Even more, I don't like to break the chain of apply. It doesn't look like the elegant way to do it.

Is there some kind of conditional PTransform ? Or if there isn't, would it be more efficient to put the condition check inside the PTransform and to output everything if not verified ?

Dream example:

PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
    .applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
    .apply(...

Solution

  • Unfortunately, Beam does not have any similar to applyIf, Your current approach is the general way to do such kind of conditional filtering.

    Conditional check within the PTransform adds an extra operation for every element which will have an impact of the performance based on the type of check.

    If possible, its better to avoid the transform from the pipeline instead of making the PTransform more complex.

    From code aesthetic point of view, you can use wrapper transform to conditinally apply the relevant filter pardo. Example:

    public static class ConditionallyFilter
          extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
      private final String date;
      public ConditionallyFilter(String date){
        this.date = date;
      }
      @Override
      public PCollection<LogRow> expand(PCollection<LogRow> logs) {
        if(!date.equals("")){
          logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
        }
        return logs;
      }
    } 
    
    
    // Read File
    PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...