Search code examples
snakemake

Checkpoints, unknown files, intermediate processing - missing wildcards problem


I am trying to use checkpoints for the first time to split a file into multiple fixed-sized chunks, do something to each chunk, and then merge those processed chunked files.

Here is a trimmed-down implementation I have, that doesn't work. First rule splits the requested fastq file into fixed size chunks, and it's a checkpoint:

checkpoint chunk_fastq:
    input:
        fastq=lambda wc: (
            f"LIBRARY_RUN_FASTQS[wc.library][wc.run]["12".index(wc.side)]
        ),
    params:
        chunksize=lambda wc: config["map"]["chunksize"] * 4,
    threads: 4
    output:
        directory("outputs/fastq_chunks/{library}.{run}.{side,[12]}"),
    shell:
        """
        mkdir -p {output};
        zcat {input.fastq} | split -l {params.chunksize} -d \
            --filter 'bgzip -c -@ {threads} > $FILE.fastq.gz' - \
            {output}/
        """

Then I have two intermediate rules connected by a pipe that should process each chunk generated in the checkpoint in the way I need:

rule map_chunks_bwa:
    input:
        reads=lambda wildcards: expand(
            "outputs/fastq_chunks/{library}.{run}.{side}/{chunk_id}.fastq.gz",
            library=wildcards.library,
            run=wildcards.run,
            side=[1, 2],
            chunk_id=wildcards.chunk_id,
        ),
        reference=config["genome"]["custom_genome_path"],
        idx=idx,
    params:
        bwa=config["map"]["mapper"],
        extra="-SP",
        sort="none",
        dedup="none",
    log:
        "logs/bwa_memx/{library}.{run}.{chunk_id}.log",
    threads: 8
    output:
        pipe(f"outputs/mapped_chunks/{{library}}.{{run}}.{{chunk_id}}.bam"),
    wrapper:
        "v1.24.0/bio/bwa-memx/mem"

rule parse_sort_chunks:
    input:
        bams="outputs/mapped_chunks/{library}.{run}.{chunk_id}.bam",
        chromsizes=config["genome"]["chrom_sizes_path"],
    threads: 8
    params:
        dropsam_flag="--drop-sam" if config["parse"].get("drop_sam", False) else "",
        dropreadid_flag="--drop-readid"
        if config["parse"].get("drop_readid", False)
        else "",
        dropseq_flag="--drop-seq" if config["parse"].get("drop_seq", True) else "",
        parsing_options=config["parse"].get("parsing_options", ""),
        keep_bams_command=f"| tee >(samtools view -bS > outputs/mapped_chunks/{{library}}.{{run}}.{{chunk_id}}.{custom_genome_name}.bam)"
        if config["parse"]["keep_unparsed_bams"]
        else "",
    output:
        f"outputs/parsed_chunks/{{library}}.{{run}}.{{chunk_id,[0-9]+}}.{custom_genome_name}.pairs.gz",
    shell:
        """
        cat {input.bams} {params.keep_bams_command} |
        pairtools parse {params.dropsam_flag} {params.dropreadid_flag} {params.dropseq_flag} \
        {params.parsing_options} \
        -c {input.chromsizes} \
        | pairtools sort --nproc {threads} \
        -o {output} \
        """

Finally, I need to merge the output of the previous rule, generating one file for each library - combining all runs together. I know the list of all runs for each library, I knows that for each run there is always two values of side (1 or 2). I don't know how many and what chunk_id I have.

With the implementation below the problem is that my merging function only has one wildcard - library, and missing the run and side checkpoints. Hence, I can not check the output of the checkpoint - it needs to have all the wildcards!

def get_pair_chunks(wildcards):
    checkpoint_output = checkpoints.chunk_fastq.get(**wildcards).output[0]
    return expand(
        "outputs/parsed_chunks/{library}.{run}.{chunk_id}.{custom_genome_name}.pairs.gz",
        library=wildcards.library,
        run=wildcards.run,

        chunk_id=glob_wildcards(
            f"outputs/fastq_chunks/{wildcards.library}.{wildcards.run}.{wildcards.side}/{{chunk_id}}.fastq.gz",
        ).chunk_id,
        custom_genome_name=custom_genome_name,
    )

rule merge_sort_dedup:
    input:
        pairs=get_pair_chunks,
    params:
        dedup_options=lambda wildcards: config["dedup"].get("dedup_options", ""),
    threads: 8
    output:
        multiext(
            f"outputs/library_pairs/{{library}}.{custom_genome_name}",
            ".nodups.pairs.gz",
            ".nodups.bam",
            ".unmapped.pairs.gz",
            ".unmapped.bam",
            ".dups.pairs.gz",
            ".dups.bam",
            ".dedup.stats",
        ),
     shell:
         f"""pairtools merge {{input.pairs}}) --nproc {{threads}}""" # and more stuff here

Is there a way to solve this problem?


Solution

  • I think you will have to merge in two steps. First merge all your chunks back into a file that still contains the run and side wildcards. And in a second step merge all the runs (and sides?) per library.

    Generally speaking, you have to options:

    1. Your aggregation step input function has exactly the same wildcards as your checkpoint-scattering rule. For this, the docs give a good example: https://snakemake.readthedocs.io/en/latest/snakefiles/rules.html#data-dependent-conditional-execution
    2. Your aggregation step input function determines the value of certain parts of the input file name that are wildcards in the checkpoint rule from other logic, for example by parsing some samples.tsv file or by looking stuff up in the config dictionary.

    Also, a further sidenote: if you can make the number of chunks produced at the scattering step fixed (and often you can), then the snakemake scatter-gather process is a much more efficient way to encode this. checkpoints are always a bit hacky and create quite a bit of overhead in the handling of the directed acyclic graph (DAG) of rule dependencies.