Search code examples
bioinformaticssnakemake

Building a workflow with dynamic input without using deprecated dynamic API


I'd like to make a workflow which downloads the list of some FASTQ files from the remote server, checks md5 and runs some post-processing, e.g. aligning.

I understand how to implement this using two workflows:

  1. first download fastq files list file, e.g. md5 file.

  2. read the md5 file content and create corresponding targets in all rule for desired resulting files.

I'd like to do this in a single workflow. The incorrect workflow below shows the idea what I'd like to achieve.

  • in all rule input: section I don't know {sample} values before md5 file is download and parsed

  • I've tried to play with dynamic, checkpoints and subforkflows, but failed to achieve the desired result. As for dynamic I've managed only to implement this workflow only for dynamic("fastq/{sample}.fq.gz.md5") output.

  • Also, I'm interested in a solution which doesn't use dynamic because it is deprecated.

rule all:
    input:
         "md5",
         "bams/{sample}.bam",

rule download_files_list:
    output: "md5"
    #shell: "wget {}".format(config["url_files_list"])
    run:
        # For testing instead of downloading:
        content = """
        bfe583337fd68b3  ID_001_1.fq.gz
        1636b6756daa65f  ID_001_2.fq.gz
        0428baf25307249  ID_002_1.fq.gz
        de33d81ba5bfa62  ID_002_2.fq.gz
        """.strip()
        with open(output[0], mode="w") as f:
            print(content, file=f)

rule fastq_md5_files:
    input: "md5"
    output: "fastq/{sample}.fq.gz.md5"
    shell: "mkdir -p fastq && awk '{{ print $0 > (\"fastq/\" $2 \".md5\") }}' {input}"

rule download_fastq_and_check_md5:
    input: "fastq/{sample}.fq.gz.md5"
    output: "fastq/{sample}.fq.gz"
    #shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
    shell: "touch {output}" 

rule align_fastq:
    input: "fastq/{sample}.fq.gz"
    output: "bams/{sample}.bam"
    shell: "touch {output}" # aligning task

Solution

  • I've seen a lot of confusion over how to use the new checkpoint feature. Here is an illustrative example, simplified:

    shell.prefix('set -vexu pipefail; ')
    
    rule finish:
            input:
                    "D/all.txt"
    
    checkpoint A:
            output:
                    mydir = directory("A")
            shell: """
                    mkdir -p A
                    N=$(( $RANDOM % 7 + 1))
                    echo "N=$N"
                    # Create a number of files. (
                    for i in $(seq 1 $N); do
                            echo $i > "A/$i.txt"
                    done
            """
    
    rule B:
            output:
                    txt = "B/{i}.txt",
            input:
                    txt = "A/{i}.txt",
            shell: """
                    mkdir -p B
                    cp -f {input.txt} {output.txt}
            """
    
    rule C:
            output:
                    txt = "C/{i}.txt",
            input:
                    txt = "B/{i}.txt",
            shell: """
                    mkdir -p C
                    cp -f {input.txt} {output.txt}
            """
    
    def gatherForD_fromC_basedOnA(wildcards):
            checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
            # That will raise if not ready.
    
            ivals = glob_wildcards(os.path.join(checkpoint_output,
                            "{i}.txt")).i
            print("ivals={}".format(ivals))
            return expand("C/{i}.txt", i=ivals)
    
    rule D:
            output:
                    combined = "D/all.txt",
            input:
                    gathered = gatherForD_fromC_basedOnA,
            shell: """
                    mkdir -p D
                    cat {input.gathered} > {output.combined}
            """
    

    Copy to snakefile and run it with

    snakemake --verbose -p
    
    • Checkpoint/Rule A outputs a random number of files. (You could base those on an "input" section instead, of course.)
    • Rules B and C are parallel rules using standard snakemake "wildcards".
    • Rule D takes an unknown number of inputs, based on an input-generating function.
    • Function gatherForD_fromC_basedOnA waits for the output of checkpoint-rule A, but it names the output of rule C, which is finally consumed by rule D. As a result, snakemake knows what D will consume (after A is done). So it knows what C must produce. So it knows what B must produce.
    • Finally, rule finish waits for a specific, known file.