Search code examples
snakemake

snakemake workflow broken by a rule that produces multiple files


Consider the following snakemake workflow (complete in this gist):

I have a predefined set of parameters that define my workflow lanes:

PAR={
  "id_list": range(1,10),
}

I need to stage the data, here simulated by creating files with random numbers:

rule stage:
  output: "in/{id}.old"
  shell: "echo $RANDOM > {output}"

I have a function that collects all the staged file names and an accompanying rule that aggregates the staging step:

def get_all_dat(wildcards):
  out=[]
  for i in PAR["id_list"]:
    dat=rules.stage.output[0].format(id=i)
    out.append(dat)
  return out

rule stage_all:
  input: get_all_dat
  output: "in/staged.list"
  shell: "for i in {input}; do echo $i; done > {output}"

I definitively do not need the get_all_dat function to do something as simple as this example (expand on the input of stage_all would do that), but i decided to include it here because it matches my actual workflow, where there are a few wildcards and they all need to line up, which this function makes sure of.

Then comes the processing step:

rule process:
  input: 
    list="in/staged.list",
    util="process.sh"
  output: "out/{id}.new",
  shell: "./{input.util} $(cat {input.list})"

It takes the list of files coming from the stage_all rule and passes the contents to the process.sh script. This script essentially does some dummy change to in/{id}.old and writes to out/{id}.new, refer to the gist for the exact code.

Crucially, this process reads all in/{id}.old files and creates all out/{id}.new files. It is here that the workflow lanes get mangled. As with the get_all_dat function, this "processing" is an example; the actual processing in my actual workflow cannot be broken into separate {id} lanes.

Next step is "plotting":

rule plot:
  input:  "out/{id}.new"
  output: "out/{id}.plot"
  shell: "echo \"plot of $(cat {input})\" > {output}"

... which gets its own aggregator (just like the staging step):

def get_all_plot(wildcards):
  out=[]
  for i in PAR["id_list"]:
    dat=rules.plot.output[0].format(id=i)
    out.append(dat)
  return out

rule plot_all:
  input: get_all_plot
  output: "out/plotted.list"
  shell: "for i in {input}; do echo $i; done > {output}"

The main problem of rule process is that each out/{id}.new file will initiate a new call to process.sh, reading concurrently all in/{id}.old files and writing concurrently all out/{id}.new, which is not good. I've added some code to process.sh to count how many times this script is called, see the gist.

Things I've tried:

  1. using bash and lock files, as well as flock, to force the extra calls to wait for the lucky first process.sh thread to finish and then continue with no error;
  2. using directory("out") in the output: of rule process;
  3. adding an additional rule connecting out/{id}.new to the directory("out"):
    rule connector:
      input: "out",
      output: "out/{id}.new",

Consequences:

  1. race conditions galore, there's really no good way to ensure only one process.sh is executed and snakemake deletes out/{id}.new files (as it should) because it couldn't find them when the corresponding {id} process rule was first called;
  2. the workflow gets broken because there's nothing connecting out/{id}.new to the directory("out");
  3. ChildIOException: File/directory is a child to another output:

My intention is to run the complete workflow with out/plotted.list as target, with an arbitrary number of cores (which will all need to wait for the one process.sh thread to finish). The reason is that the process step is cheap, while the plot steps are expensive and {id} can have many many values.

Thanks for bearing with me through the long post.


Solution

  • What about this:

    rule process:
        input: 
            list="in/staged.list",
            util="process.sh",
        output: 
            touch('out/staged.list'),
        shell: 
            "./{input.util} $(cat {input.list})"
    
    rule plot:
        input:
            list= 'out/staged.list ',
        output: 
            "out/{id}.plot"
        params:
            id= lambda wc: "out/%s.new" % wc.id,
        shell: 
            r"""
            echo "plot of $(cat {params.id})" > {output}
            """
    

    Rule process gives in output a single dummy file so it is executed only once but it produces all the necessary files.

    Rule plot takes in input the dummy file from above so its real input must also be present. The real input file is passed as a parameter.

    Next rules should be able to use "out/{id}.plot" as normal.