I'm in a situation, where I would like to scatter my workflow into a variable number of chunks, which I don't know beforehand. Maybe it is easiest to explain the problem by being concrete:
Someone has handed me FASTQ files demultiplexed using bcl2fastq
with the no-lane-splitting
option. I would like to split these files according to lane, map each lane individually, and then finally gather everything again. However, I don't know the number of lanes beforehand.
Ideally, I would like a solution like this,
rule split_fastq_file: (...) # results in N FASTQ files
rule map_fastq_file: (...) # do this N times
rule merge_bam_files: (...) # merge the N BAM files
but I am not sure this is possbile. The expand
function requires me to know the number of lanes, and can't see how it would be possible to use wildcards for this, either.
I should say that I am rather new to Snakemake, and that I may have complete misunderstood how Snakemake works. It has taken me some time to get used to think about things "upside-down" by focusing on output files and then working backwards.
One option is to use checkpoint
when splitting the fastqs, so that you can dynamically re-evaluate the DAG at a later point to get the resulting lanes.
Here's an MWE step by step:
# Requires Python 3.6+ for f-strings, Snakemake 5.4+ for checkpoints
import pathlib
import random
random.seed(1)
rule make_fastq:
output:
fastq = touch("input/{sample}.fastq")
checkpoint
, rather than a rule, so that we can later access the result. Also, we declare the output here as a directory specific to the sample, so that we can later glob in it to get the lanes that were created. checkpoint split_fastq:
input:
fastq = rules.make_fastq.output.fastq
output:
lane_dir = directory("temp/split_fastq/{sample}")
run:
pathlib.Path(output.lane_dir).mkdir(exist_ok=True)
n_lanes = random.randrange(1, 10)-
lane_numbers = random.sample(range(1, 10), k = n_lanes)
for lane_number in lane_numbers:
path = pathlib.Path(output.lane_dir) / f"L00{lane_number}.fastq"
path.touch()
rule map_fastq:
input:
fastq = "temp/split_fastq/{sample}/L00{lane_number}.fastq"
output:
bam = "temp/map_fastq/{sample}/L00{lane_number}.bam"
run:
bam = pathlib.Path(output.bam)
bam.parent.mkdir(exist_ok=True)
bam.touch()
split_fastq
, so that we can do a dynamic expand
on these. We do the expand
on the last rule in the chain of intermediate processing steps, in this case map_fastq
, so that we ask for the correct inputs.def get_bams(wildcards):
lane_dir = checkpoints.split_fastq.get(**wildcards).output[0]
lane_numbers = glob_wildcards(f"{lane_dir}/L00{{lane_number}}.fastq").lane_number
bams = expand(rules.map_fastq.output.bam, **wildcards, lane_number=lane_numbers)
return bams
rule merge_bam:
input:
get_bams
output:
bam = "temp/merge_bam/{sample}.bam"
shell:
"cat {input} > {output.bam}"
This example runs, and with random.seed(1)
happens to create three lanes (l001
, l002
, and l005
).
If you don't want to use checkpoint
, I think you could achieve something similar by creating an input function for merge_bam
that opens up the original input fastq, scans the read names for lane info, and predicts what the input files ought to be. This seems less robust, however.