Search code examples
snakemake

Using checkpoint in snakemake to recover unknown output files with more than one wildcard


I would like to use checkpoints in snakemake to recover output files that are unknown before execution. I have followed the examples here https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#data-dependent-conditional-execution. However, the function that is used to recover the checkpoint outputs throws and error if the output includes more than one wildcard in the path.

I have tried to create a small simplified example below based on the example provided in the documentation.

I am assuming I need to adjust the aggregate_input function to accept the sample wildcard too. Would anyone be able to advise?

SAMPLE = ["A", "B", "C", "D", "E"]

# a target rule to define the desired final output
rule all:
    input:
        "aggregated.txt"

# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
    output:
        directory("results/{sample}")
    shell:
        '''
        mkdir -p results
        mkidr results/{wildcards.sample}
        cd results/{wildcards.sample}
        for i in 1 2 3; do touch $i.txt; done
         '''

# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
    checkpoint_output = checkpoints.somestep.get(**wildcards).output[0]
    return expand("results/{sample}/{i}.txt", sample=wildcards.sample, i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i)

rule aggregate:
    input:
        aggregate_input
    output:
        "aggregated.txt"
    shell:
        "cat {input} > {output}"

This generates the error below:

InputFunctionException in rule aggregate in file /gpfs/nhmfsa/bulk/share/data/mbl/share/workspaces/groups/clarkgroup/oliw/test_snakemake_checkpoint/Snakefile, line 26:
Error:
  WorkflowError:
    Missing wildcard values for sample
Wildcards:

Traceback:
  File "/gpfs/nhmfsa/bulk/share/data/mbl/share/workspaces/groups/clarkgroup/oliw/test_snakemake_checkpoint/Snakefile", line 23, in aggregate_input

Solution

  • I think the issue here is that when checkpoints.somestep.get(**wildcards) is called, wildcards.sample is not defined because the sample wildcard is not defined in the aggregate rule. You can overcome this like so:

    SAMPLE = ["A", "B", "C", "D", "E"]
    
    
    # a target rule to define the desired final output
    rule all:
        input:
            "aggregated.txt",
    
    
    # the checkpoint that shall trigger re-evaluation of the DAG
    # an number of file is created in a defined directory
    checkpoint somestep:
        output:
            directory("results/{sample}"),
        shell:
            """
            mkdir -p results
            mkdir results/{wildcards.sample}
            cd results/{wildcards.sample}
            for i in 1 2 3; do touch $i.txt; done
            """
    
    
    # input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
    def aggregate_input(wildcards):
        out = []
        for sample in SAMPLE:
            checkpoint_output = checkpoints.somestep.get(**{"sample": sample}).output[0]
            out.extend(
                expand(
                    "results/{sample}/{i}.txt",
                    sample=sample,
                    i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i,
                )
            )
        return out
    
    
    rule aggregate:
        input:
            aggregate_input,
        output:
            "aggregated.txt",
        shell:
            "cat {input} > {output}"
    

    In my experience with checkpoints I may more typically do something like this:

    SAMPLE = ["A", "B", "C", "D", "E"]
    
    
    # a target rule to define the desired final output
    rule all:
        input:
            "aggregated.txt",
    
    
    # the checkpoint that shall trigger re-evaluation of the DAG
    # an number of file is created in a defined directory
    checkpoint somestep:
        output:
            directory("results/{sample}"),
        shell:
            """
            mkdir -p results
            mkdir results/{wildcards.sample}
            cd results/{wildcards.sample}
            for i in 1 2 3; do touch $i.txt; done
            """
    
    
    # input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
    def aggregate_input(wildcards):
        checkpoint_output = checkpoints.somestep.get(**wildcards).output[0]
        return expand(
            "results/{sample}/{i}.txt",
            sample=wildcards.sample,
            i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i,
        )
    
    
    rule aggregate_per_sample:
        input:
            aggregate_input,
        output:
            "{sample}_aggregated.txt",
        shell:
            "cat {input} > {output}"
    
    
    rule aggregate_samples:
        input:
            expand("{sample}_aggregated.txt", sample=SAMPLE),
        output:
            "aggregated.txt",
        shell:
            "cat {input} > {output}"