Search code examples
snakemake

snakemake checkpoint and create a expand list/wildcards based on created output files


hope someone can guide me in the right direction. See below for a small working example:

from glob import glob

rule all:
    input:
        expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])

checkpoint split_to_fasta:
    input:
        seqfile = "files/seqs.csv"#just an empty file in this example
    output:
        fastas=directory("output/fasta")
    shell:
        #A python script will create below files and I dont know them beforehand.
        "mkdir -p {output.fastas} ; "
        "echo test > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
        "echo test >  {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
        "echo test >  {output.fastas}/2_LEFT_sample_name_spam.fa ;"
        "echo test >  {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
        "echo test >  {output.fastas}/3_LEFT_sample_name_egg.fa ;"
        "echo test >  {output.fastas}/4_RIGHT_sample_name_ham.fa ;"

rule step2:
    input:
        fasta = "output/fasta/{fasta}.fa"
    output:
        step2 = "output/step2/{fasta}_step2.txt",
    shell:
        "cp {input.fasta} {output.step2}"

rule step3:
    input:
        file = rules.step2.output.step2
    output:
        step3 = "output/step3/{fasta}_step3.txt",
    shell:
        "cp {input.file} {output.step3}"

def aggregate_input(wildcards):
    checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
    ###dont know where to use this line correctly
    ###files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
    return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta)

def get_id_files(wildcards):
    blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
    return sorted(blast)

rule step4:
    input:
        step3files = aggregate_input,
        idfiles = get_id_files
    output:
        step4 = "output/step4/{number}_step4.txt",
    run:
        shell("cat {input.idfiles} > {output.step4}")

Because of rule all snakemake knows how to "start" the pipeline. I hard coded the numbers 1,2,3 and 4 but in a real situation I dont know these numbers beforehand.

expand("output/step4/{number}_step4.txt", number=["1","2","3","4"])

What I want is to get those numbers based on the output filenames of split_to_fasta, step2 or step3. And then use it as a target for wildcards. (I can easily get the numbers with glob and split)

I want to do it with wildcards like in def get_id_files because I want to execute the next step in parallel. In other words, the following sets of files need to go in the next step:

[1_LEFT_sample_name_foo.fa, 1_RIGHT_sample_name_foo.fa]
[2_LEFT_sample_name_spam.fa, 2_RIGHT_sample_name_bla.fa]
[3_LEFT_sample_name_egg.fa]
[4_RIGHT_sample_name_ham.fa]

A may need a second checkpoint but not sure how to implement that.

EDIT (solution):

I was already worried my question was not so clear so I made another example, see below. This pipeline generates some fake files (end of step 3). From this point I want to continue and process all files with the same id in parallel. The id is the number at the beginning of the filename. I could make a second pipeline that "starts" with step 4 and execute them after each other but that sounds like bad practice. I think I need to define a target for the next rule (step 4) but dont know how to do that based on this situation. The code to define the target itself is something like:

    files = [Path(x).stem.split("_")[0] for x in glob("output/step3/"+ f"*_step3.txt") ]
    ids = list(set(files))
    expand("output/step4/{number}_step4.txt", number=ids)

The second example (Edited to the solution):

from glob import glob

def aggregate_input(wildcards):
    checkpoint_output = checkpoints.split_to_fasta.get(**wildcards).output[0]
    ids = [Path(x).stem.split("_")[0] for x in glob("output/fasta/"+ f"*.fa") ]
    return expand("output/step3/{fasta}_step3.txt", fasta=glob_wildcards(os.path.join(checkpoint_output, "{fasta}.fa")).fasta) + expand("output/step4/{number}_step4.txt", number=ids)

rule all:
    input:
        aggregate_input,

checkpoint split_to_fasta:
    input:
        seqfile = "files/seqs.csv"
    output:
        fastas=directory("output/fasta")
    shell:
        #A python script will create below files and I dont know them beforehand.
        #I could get the numbers if needed
        "mkdir -p {output.fastas} ; "
        "echo test1 > {output.fastas}/1_LEFT_sample_name_foo.fa ;"
        "echo test2 >  {output.fastas}/1_RIGHT_sample_name_foo.fa ;"
        "echo test3 >  {output.fastas}/2_LEFT_sample_name_spam.fa ;"
        "echo test4 >  {output.fastas}/2_RIGHT_sample_name_bla.fa ;"
        "echo test5 >  {output.fastas}/3_LEFT_sample_name_egg.fa ;"
        "echo test6 >  {output.fastas}/4_RIGHT_sample_name_ham.fa ;"

rule step2:
    input:
        fasta = "output/fasta/{fasta}.fa"
    output:
        step2 = "output/step2/{fasta}_step2.txt",
    shell:
        "cp {input.fasta} {output.step2}"

rule step3:
    input:
        file = rules.step2.output.step2
    output:
        step3 = "output/step3/{fasta}_step3.txt",
    shell:
        "cp {input.file} {output.step3}"

def get_id_files(wildcards):
    #blast = glob("output/step3/"+ f"{wildcards.number}*_step3.txt")
    blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
    return blast

rule step4:
    input:
        idfiles = get_id_files
    output:
        step4 = "output/step4/{number}_step4.txt",
    run:
        shell("cat {input.idfiles} > {output.step4}")

Solution

  • Replacing your blast line in get_id_functions in second example with

    blast = expand(f"output/step3/{wildcards.number}_{{sample}}_step3.txt", sample=glob_wildcards(f"output/fasta/{wildcards.number}_{{sample}}.fa").sample)
    

    This is my way of understanding checkpoint, when the input of a rule (say rule a) is checkpoint, anything upstream of a is blocked by the first evaluation of DAG, after checkpoint has been successfully executed. The second round of evaluation would start with knowing the output of checkpoints.

    So in your case, putting checkpoint in rule all would hide step2/3/4 at 1st evaluation (since these steps are upstream of all). Then checkpoint got executed, then 2nd evaluation. At this time point, you are evaluating a new workflow knowing all outputs of checkpoint, so you can 1. infer the ids 2. infer the corresponding step3 outputs according to split_to_fasta outputs.

    1st evaluation: Rule all -> checkpoint split_to_fasta (TBD)

    2nd evaluation(split_to_fasta executed): Rule all -> checkpoint split_to_fasta -> Rule step_4 -> Rule step_3 -> Rule step_2

    get_id_files happens at step_4, where step_3 has not been executed, this is why you need to infer based on outputs of split_to_fasta instead of directly finding the outputs of step 3