Search code examples
pythonglobsnakemakedirected-acyclic-graphs

How to make rule "all" in Snakefile condition on completion of parallel wildcard rule


I have some TGZ files containing audio SPH samples, which I unpack in snakemake like this:

tgz_files = ["a.tgz", "b.tgz"]
tgz_dirs = ["a", "b"]
rule untar_tgz_files:
     input:
        tgz_files
     output:
        directory(tgz_dirs)
     shell:
        tar -xzvf {input}

I don't know the names of the SPH sample files until after the untar. I then have a rule which translates the SPH files to WAV files, like this:

rule sph_to_wav:
     input:
        "{root}/{filename}.sph"
     output:
        "{root}_wav/{filename}.wav"
     shell:
        sox -t sph {input} -b 16  -t wav {output}

I want my Snakefile to run both of these steps (untar and convert), not knowing in advance the exact names of the SPH files in the TGZ archives. I need something like this to mark the completion of the sph_to_wav rule:

rule sph_to_wav_finished:
     input:
        "{root}_wav/{filename}.wav"
     output:
        "sph_to_wav_finished.txt"

and then I want to condition rule all on both of these processes:

rule all:
     input:
         tgz_dirs, "sph_to_wav_finished.txt"

However, I get the error:

Building DAG of jobs...
MissingInputException in Snakefile:
Missing input files for rule all:
sph_to_wav_finished.txt

How do I write this so that

  • Snakemake doesn't complain and runs the unpack and sph to wav
  • Runs the sph to wav after the unpack

?


Solution

  • This sounds like a use-case for a checkpoint. Since rule untar_tgz_files generates files that are not known in advance, you can convert it into a checkpoint:

    checkpoint untar_tgz_files:
        ... # everything defined as in a regular rule
    

    This will tell snakemake that once this checkpoint has been completed, DAG needs to be re-evaluated to take into account of the new files that were created.

    Downstream rules will need to find out about the new files, so typically you will do some sort of glob.glob to get the list of new files. This is a rough idea, but you might need to fine tune it:

    def list_new_files(wildcards):
        output_dir = checkpoints. untar_tgz_files.get(sample=wildcards.sample).output
        # you will also want to parse "root" here, skipping it for simplicity
        filenames, _ = glob_wildcards(output_dir+"/{filename}.sph")
        new_files = expand("{filename}.wav", filename=filenames)
        return new_files
    

    Finally, collect all the translated files with:

    rule sph_to_wav_finished:
         input:
            list_new_files,
         output:
            "sph_to_wav_finished.txt"