I am trying to use checkpoints for the first time to split a file into multiple fixed-sized chunks, do something to each chunk, and then merge those processed chunked files.
Here is a trimmed-down implementation I have, that doesn't work. First rule splits the requested fastq file into fixed size chunks, and it's a checkpoint:
checkpoint chunk_fastq:
input:
fastq=lambda wc: (
f"LIBRARY_RUN_FASTQS[wc.library][wc.run]["12".index(wc.side)]
),
params:
chunksize=lambda wc: config["map"]["chunksize"] * 4,
threads: 4
output:
directory("outputs/fastq_chunks/{library}.{run}.{side,[12]}"),
shell:
"""
mkdir -p {output};
zcat {input.fastq} | split -l {params.chunksize} -d \
--filter 'bgzip -c -@ {threads} > $FILE.fastq.gz' - \
{output}/
"""
Then I have two intermediate rules connected by a pipe that should process each chunk generated in the checkpoint in the way I need:
rule map_chunks_bwa:
input:
reads=lambda wildcards: expand(
"outputs/fastq_chunks/{library}.{run}.{side}/{chunk_id}.fastq.gz",
library=wildcards.library,
run=wildcards.run,
side=[1, 2],
chunk_id=wildcards.chunk_id,
),
reference=config["genome"]["custom_genome_path"],
idx=idx,
params:
bwa=config["map"]["mapper"],
extra="-SP",
sort="none",
dedup="none",
log:
"logs/bwa_memx/{library}.{run}.{chunk_id}.log",
threads: 8
output:
pipe(f"outputs/mapped_chunks/{{library}}.{{run}}.{{chunk_id}}.bam"),
wrapper:
"v1.24.0/bio/bwa-memx/mem"
rule parse_sort_chunks:
input:
bams="outputs/mapped_chunks/{library}.{run}.{chunk_id}.bam",
chromsizes=config["genome"]["chrom_sizes_path"],
threads: 8
params:
dropsam_flag="--drop-sam" if config["parse"].get("drop_sam", False) else "",
dropreadid_flag="--drop-readid"
if config["parse"].get("drop_readid", False)
else "",
dropseq_flag="--drop-seq" if config["parse"].get("drop_seq", True) else "",
parsing_options=config["parse"].get("parsing_options", ""),
keep_bams_command=f"| tee >(samtools view -bS > outputs/mapped_chunks/{{library}}.{{run}}.{{chunk_id}}.{custom_genome_name}.bam)"
if config["parse"]["keep_unparsed_bams"]
else "",
output:
f"outputs/parsed_chunks/{{library}}.{{run}}.{{chunk_id,[0-9]+}}.{custom_genome_name}.pairs.gz",
shell:
"""
cat {input.bams} {params.keep_bams_command} |
pairtools parse {params.dropsam_flag} {params.dropreadid_flag} {params.dropseq_flag} \
{params.parsing_options} \
-c {input.chromsizes} \
| pairtools sort --nproc {threads} \
-o {output} \
"""
Finally, I need to merge the output of the previous rule, generating one file for each library - combining all runs together. I know the list of all runs for each library, I knows that for each run there is always two values of side (1 or 2). I don't know how many and what chunk_id I have.
With the implementation below the problem is that my merging function only has one wildcard - library, and missing the run and side checkpoints. Hence, I can not check the output of the checkpoint - it needs to have all the wildcards!
def get_pair_chunks(wildcards):
checkpoint_output = checkpoints.chunk_fastq.get(**wildcards).output[0]
return expand(
"outputs/parsed_chunks/{library}.{run}.{chunk_id}.{custom_genome_name}.pairs.gz",
library=wildcards.library,
run=wildcards.run,
chunk_id=glob_wildcards(
f"outputs/fastq_chunks/{wildcards.library}.{wildcards.run}.{wildcards.side}/{{chunk_id}}.fastq.gz",
).chunk_id,
custom_genome_name=custom_genome_name,
)
rule merge_sort_dedup:
input:
pairs=get_pair_chunks,
params:
dedup_options=lambda wildcards: config["dedup"].get("dedup_options", ""),
threads: 8
output:
multiext(
f"outputs/library_pairs/{{library}}.{custom_genome_name}",
".nodups.pairs.gz",
".nodups.bam",
".unmapped.pairs.gz",
".unmapped.bam",
".dups.pairs.gz",
".dups.bam",
".dedup.stats",
),
shell:
f"""pairtools merge {{input.pairs}}) --nproc {{threads}}""" # and more stuff here
Is there a way to solve this problem?
I think you will have to merge in two steps. First merge all your chunks back into a file that still contains the run
and side
wildcards. And in a second step merge all the run
s (and side
s?) per library
.
Generally speaking, you have to options:
samples.tsv
file or by looking stuff up in the config
dictionary.Also, a further sidenote: if you can make the number of chunks produced at the scattering step fixed (and often you can), then the snakemake scatter-gather
process is a much more efficient way to encode this. checkpoint
s are always a bit hacky and create quite a bit of overhead in the handling of the directed acyclic graph (DAG) of rule dependencies.