Search code examples
pythonmachine-learningarchitecturepipelinedagster

Cross Validation using Dagster


I've started using Dagster in our ML pipeline, and am running into some basic issues that I'm wondering if I'm missing something trivial here or if this is just how it is...

Say I have a simple ML pipepline:

Load raw data --> Process data into table --> Split train / test --> train model --> evaluate model.

A linear model is straight forward in Dagster. But what if I want to add a little loop, say for cross-validation purposes:

Load raw data --> Process data into table --> Split into k folds, and for each fold:
  - fold 1: train model --> evaluate
  - fold 2: train model --> evaluate
  - fold 3: train model --> evaluate
  --> summarize cross validation results.

Is there a nice & clean way to do this in Dagster? The way I've been doing things is:

Load raw data --> Process data into table --> Split into K folds --> choose fold k --> train model --> evaluate model

With the fold "k" as an input parameter for the pipeline. And then running the pipeline K times.

What am I missing here?


Solution

  • Yes, Dagster does support having solids fan-out into multiple solids and than fan-in to a sink solid (ie to summarize results) within a single pipeline. Here is some example code and the corresponding dag visualization in dagit (full dag and zoomed in).

    @solid
    def load_raw_data(_):
        yield Output('loaded_data')
    
    
    @solid
    def process_data_into_table(_, raw_data):
        yield Output(raw_data)
    
    
    @solid(
        output_defs=[
            OutputDefinition(name='fold_one', dagster_type=int, is_required=True),
            OutputDefinition(name='fold_two', dagster_type=int, is_required=True),
        ],
    )
    def split_into_two_folds(_, table):
        yield Output(1, 'fold_one')
        yield Output(2, 'fold_two')
    
    
    @solid
    def train_fold(_, fold):
        yield Output('model')
    
    
    @solid
    def evaluate_fold(_, model):
        yield Output('compute_result')
    
    
    @composite_solid
    def process_fold(fold):
        return evaluate_fold(train_fold(fold))
    
    
    @solid
    def summarize_results(context, fold_1_result, fold_2_result):
        yield Output('summary_stats')
    
    
    @pipeline
    def ml_pipeline():
        fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data()))
    
        process_fold_one = process_fold.alias('process_fold_one')
        process_fold_two = process_fold.alias('process_fold_two')
    
        summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))
    

    In the example code, we use alias-es in order to re-use the same logic for each fold. We also consolidate the logic for processing each fold within the composite solid.

    Another option is to programmatically create a PipelineDefinition directly but I would recommend the above.