Search code examples
pythonpipelineduckdbdagsterdata-engineering

Parallel, depth-first ops in Dagster with ops, graphs and jobs together


(also posted on r/dagster)

Dagster N00b here.

I have a very specific use-case. My ETL executes the following steps:

  1. Query a DB to get a list of CSV files
  2. Go to a filesystem and for each CSV file:
  • load it into DuckDB
  • transform some columns to date
  • transform some numeric codes to text categories
  • export clean table to a .parquet file
  • run a profile report for the clean data

The DuckDB tables are named just the same as the CSV files for convenience.

2a through 2e can be done in parallel FOR EACH CSV FILE. Within the context of a single CSV file, they need to run SERIALLY.

My current code is:

@op
def get_csv_filenames(context) -> List[str]:

@op(out=DynamicOut())
def generate_subtasks(context, csv_list:List[str]):
  for csv_filename in csv_list:
    yield DynamicOutput(csv_filename, mapping_key=csv_filename)

def load_csv_into_duckdb(context, csv_filename)

def transform_dates(context, csv_filename)

def from_code_2_categories(context, csv_filename)

def export_2_parqu

Solution

  • If I understand correctly, you want depth-first processing, instead of breadth first? I think you might be able to trigger depth-first processing using a nested graph after the dynamic output step. You're also conceptually missing how to set dependencies between ops in Dagster. Something like this should work:

    @op
    def get_csv_filenames(context) -> List[str]:
    
    @op(out=DynamicOut())
    def generate_subtasks(context, csv_list:List[str]):
      for csv_filename in csv_list:
        yield DynamicOutput(csv_filename, mapping_key=csv_filename)
    
    @op
    def load_csv_into_duckdb(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def transform_dates(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def from_code_2_categories(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def export_2_parquet(context, csv_filename)
      ...
      return csv_filename
    
    @op
    def profile_dataset(context, csv_filename)
      ...
      return csv_filename
    
    @graph
    def process(context, csv_filename:str):
      profile_dataset(export_2_parquet(from_code_2_categories(transform_dates(load_csv_into_duckdb(csv_filename)))))
    
      
    @job
    def pipeline():
      csv_filename_list = get_csv_filenames()
      generate_subtasks(csv_filename_list).map(process)