Consider the following snakemake workflow (complete in this gist):
I have a predefined set of parameters that define my workflow lanes:
PAR={
"id_list": range(1,10),
}
I need to stage the data, here simulated by creating files with random numbers:
rule stage:
output: "in/{id}.old"
shell: "echo $RANDOM > {output}"
I have a function that collects all the staged file names and an accompanying rule that aggregates the staging step:
def get_all_dat(wildcards):
out=[]
for i in PAR["id_list"]:
dat=rules.stage.output[0].format(id=i)
out.append(dat)
return out
rule stage_all:
input: get_all_dat
output: "in/staged.list"
shell: "for i in {input}; do echo $i; done > {output}"
I definitively do not need the get_all_dat
function to do something as simple as this example (expand
on the input of stage_all
would do that), but i decided to include it here because it matches my actual workflow, where there are a few wildcards and they all need to line up, which this function makes sure of.
Then comes the processing step:
rule process:
input:
list="in/staged.list",
util="process.sh"
output: "out/{id}.new",
shell: "./{input.util} $(cat {input.list})"
It takes the list of files coming from the stage_all
rule and passes the contents to the process.sh
script. This script essentially does some dummy change to in/{id}.old
and writes to out/{id}.new
, refer to the gist for the exact code.
Crucially, this process reads all in/{id}.old
files and creates all out/{id}.new
files. It is here that the workflow lanes get mangled. As with the get_all_dat
function, this "processing" is an example; the actual processing in my actual workflow cannot be broken into separate {id}
lanes.
Next step is "plotting":
rule plot:
input: "out/{id}.new"
output: "out/{id}.plot"
shell: "echo \"plot of $(cat {input})\" > {output}"
... which gets its own aggregator (just like the staging step):
def get_all_plot(wildcards):
out=[]
for i in PAR["id_list"]:
dat=rules.plot.output[0].format(id=i)
out.append(dat)
return out
rule plot_all:
input: get_all_plot
output: "out/plotted.list"
shell: "for i in {input}; do echo $i; done > {output}"
The main problem of rule process
is that each out/{id}.new
file will initiate a new call to process.sh
, reading concurrently all in/{id}.old
files and writing concurrently all out/{id}.new
, which is not good. I've added some code to process.sh
to count how many times this script is called, see the gist.
Things I've tried:
process.sh
thread to finish and then continue with no error;directory("out")
in the output:
of rule process
;out/{id}.new
to the directory("out")
: rule connector:
input: "out",
output: "out/{id}.new",
Consequences:
process.sh
is executed and snakemake deletes out/{id}.new
files (as it should) because it couldn't find them when the corresponding {id}
process
rule was first called;out/{id}.new
to the directory("out")
;ChildIOException: File/directory is a child to another output:
My intention is to run the complete workflow with out/plotted.list
as target, with an arbitrary number of cores (which will all need to wait for the one process.sh
thread to finish). The reason is that the process
step is cheap, while the plot
steps are expensive and {id}
can have many many values.
Thanks for bearing with me through the long post.
What about this:
rule process:
input:
list="in/staged.list",
util="process.sh",
output:
touch('out/staged.list'),
shell:
"./{input.util} $(cat {input.list})"
rule plot:
input:
list= 'out/staged.list ',
output:
"out/{id}.plot"
params:
id= lambda wc: "out/%s.new" % wc.id,
shell:
r"""
echo "plot of $(cat {params.id})" > {output}
"""
Rule process
gives in output a single dummy file so it is executed only once but it produces all the necessary files.
Rule plot
takes in input the dummy file from above so its real input must also be present. The real input file is passed as a parameter.
Next rules should be able to use "out/{id}.plot"
as normal.