I would like to use checkpoints in snakemake to recover output files that are unknown before execution. I have followed the examples here https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#data-dependent-conditional-execution. However, the function that is used to recover the checkpoint outputs throws and error if the output includes more than one wildcard in the path.
I have tried to create a small simplified example below based on the example provided in the documentation.
I am assuming I need to adjust the aggregate_input function to accept the sample wildcard too. Would anyone be able to advise?
SAMPLE = ["A", "B", "C", "D", "E"]
# a target rule to define the desired final output
rule all:
input:
"aggregated.txt"
# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
output:
directory("results/{sample}")
shell:
'''
mkdir -p results
mkidr results/{wildcards.sample}
cd results/{wildcards.sample}
for i in 1 2 3; do touch $i.txt; done
'''
# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
checkpoint_output = checkpoints.somestep.get(**wildcards).output[0]
return expand("results/{sample}/{i}.txt", sample=wildcards.sample, i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i)
rule aggregate:
input:
aggregate_input
output:
"aggregated.txt"
shell:
"cat {input} > {output}"
This generates the error below:
InputFunctionException in rule aggregate in file /gpfs/nhmfsa/bulk/share/data/mbl/share/workspaces/groups/clarkgroup/oliw/test_snakemake_checkpoint/Snakefile, line 26:
Error:
WorkflowError:
Missing wildcard values for sample
Wildcards:
Traceback:
File "/gpfs/nhmfsa/bulk/share/data/mbl/share/workspaces/groups/clarkgroup/oliw/test_snakemake_checkpoint/Snakefile", line 23, in aggregate_input
I think the issue here is that when checkpoints.somestep.get(**wildcards)
is called, wildcards.sample
is not defined because the sample wildcard is not defined in the aggregate rule. You can overcome this like so:
SAMPLE = ["A", "B", "C", "D", "E"]
# a target rule to define the desired final output
rule all:
input:
"aggregated.txt",
# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
output:
directory("results/{sample}"),
shell:
"""
mkdir -p results
mkdir results/{wildcards.sample}
cd results/{wildcards.sample}
for i in 1 2 3; do touch $i.txt; done
"""
# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
out = []
for sample in SAMPLE:
checkpoint_output = checkpoints.somestep.get(**{"sample": sample}).output[0]
out.extend(
expand(
"results/{sample}/{i}.txt",
sample=sample,
i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i,
)
)
return out
rule aggregate:
input:
aggregate_input,
output:
"aggregated.txt",
shell:
"cat {input} > {output}"
In my experience with checkpoints I may more typically do something like this:
SAMPLE = ["A", "B", "C", "D", "E"]
# a target rule to define the desired final output
rule all:
input:
"aggregated.txt",
# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
output:
directory("results/{sample}"),
shell:
"""
mkdir -p results
mkdir results/{wildcards.sample}
cd results/{wildcards.sample}
for i in 1 2 3; do touch $i.txt; done
"""
# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
checkpoint_output = checkpoints.somestep.get(**wildcards).output[0]
return expand(
"results/{sample}/{i}.txt",
sample=wildcards.sample,
i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i,
)
rule aggregate_per_sample:
input:
aggregate_input,
output:
"{sample}_aggregated.txt",
shell:
"cat {input} > {output}"
rule aggregate_samples:
input:
expand("{sample}_aggregated.txt", sample=SAMPLE),
output:
"aggregated.txt",
shell:
"cat {input} > {output}"