I'd like to make a workflow which downloads the list of some FASTQ files from the remote server, checks md5 and runs some post-processing, e.g. aligning.
I understand how to implement this using two workflows:
first download fastq files list file, e.g. md5
file.
read the md5
file content and create corresponding targets in all
rule for desired resulting files.
I'd like to do this in a single workflow. The incorrect workflow below shows the idea what I'd like to achieve.
in all
rule input:
section I don't know {sample}
values before md5
file is download and parsed
I've tried to play with dynamic, checkpoints and subforkflows, but failed to achieve the desired result. As for dynamic
I've managed only to implement this workflow only for dynamic("fastq/{sample}.fq.gz.md5") output.
Also, I'm interested in a solution which doesn't use dynamic
because it is deprecated.
rule all:
input:
"md5",
"bams/{sample}.bam",
rule download_files_list:
output: "md5"
#shell: "wget {}".format(config["url_files_list"])
run:
# For testing instead of downloading:
content = """
bfe583337fd68b3 ID_001_1.fq.gz
1636b6756daa65f ID_001_2.fq.gz
0428baf25307249 ID_002_1.fq.gz
de33d81ba5bfa62 ID_002_2.fq.gz
""".strip()
with open(output[0], mode="w") as f:
print(content, file=f)
rule fastq_md5_files:
input: "md5"
output: "fastq/{sample}.fq.gz.md5"
shell: "mkdir -p fastq && awk '{{ print $0 > (\"fastq/\" $2 \".md5\") }}' {input}"
rule download_fastq_and_check_md5:
input: "fastq/{sample}.fq.gz.md5"
output: "fastq/{sample}.fq.gz"
#shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
shell: "touch {output}"
rule align_fastq:
input: "fastq/{sample}.fq.gz"
output: "bams/{sample}.bam"
shell: "touch {output}" # aligning task
I've seen a lot of confusion over how to use the new checkpoint
feature. Here is an illustrative example, simplified:
shell.prefix('set -vexu pipefail; ')
rule finish:
input:
"D/all.txt"
checkpoint A:
output:
mydir = directory("A")
shell: """
mkdir -p A
N=$(( $RANDOM % 7 + 1))
echo "N=$N"
# Create a number of files. (
for i in $(seq 1 $N); do
echo $i > "A/$i.txt"
done
"""
rule B:
output:
txt = "B/{i}.txt",
input:
txt = "A/{i}.txt",
shell: """
mkdir -p B
cp -f {input.txt} {output.txt}
"""
rule C:
output:
txt = "C/{i}.txt",
input:
txt = "B/{i}.txt",
shell: """
mkdir -p C
cp -f {input.txt} {output.txt}
"""
def gatherForD_fromC_basedOnA(wildcards):
checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
# That will raise if not ready.
ivals = glob_wildcards(os.path.join(checkpoint_output,
"{i}.txt")).i
print("ivals={}".format(ivals))
return expand("C/{i}.txt", i=ivals)
rule D:
output:
combined = "D/all.txt",
input:
gathered = gatherForD_fromC_basedOnA,
shell: """
mkdir -p D
cat {input.gathered} > {output.combined}
"""
Copy to snakefile
and run it with
snakemake --verbose -p
A
outputs a random number of files. (You could base those on an "input" section instead, of course.)B
and C
are parallel rules using standard snakemake "wildcards".D
takes an unknown number of inputs, based on an input-generating function.gatherForD_fromC_basedOnA
waits for the output of checkpoint-rule A
, but it names the output of rule C
, which is finally consumed by rule D
. As a result, snakemake knows what D
will consume (after A
is done). So it knows what C
must produce. So it knows what B
must produce.finish
waits for a specific, known file.