Basically, I have three snakemake rules (other than rule all) and cannot figure this problem out, despite the checkpoint resources.
Rule one has my first and only file that I start with. It will have x outputs (the number varies depending on the input file). Each of those x outputs needs to be processed separately in rule 2, meaning that rule 2 will run x jobs. However, only some subset, y, of these jobs will produce outputs (the software only writes out files for inputs that pass a certain threshold). So, while I want each of those outputs to run as a separate job in job 3, I don’t know how many files will come out of rule 2. Rule three will also run y jobs, one for each successful output from rule 2. I have two questions. The first is how do I write the input for rule 3, not knowing how many files will come out of rule two? The second question is how can I “tell” rule 2 it is done, when there is not a corresponding number of output files to the input files? If I add a fourth rule, I imagine it would try to re-run rule two on jobs that didn’t get an output file, which would never make an output. Maybe I am missing something with setting up the checkpoints?
something like:
rule a: input: file.vcf output: dummy.txt shell:""" .... make unknown number of output files (x) x_1 , x_2, ..., x_n """ #run a separate job from each output of rule a rule b: input: x_1 #not sure how many are going to be inputs here output: y_1 #not sure how many output files will be here shell:""" some of the x inputs will output their corresponding y, but others will have no output """ #run a separate job for each output of rule b rule c: input: y_1 #not sure how many input files here output: z_1
Advertisement
Answer
You should change rule a
to a checkpoint as mentioned in the comments. Rule b
will generate one output for each input and can be left as is, same for rule c
in this example.
Eventually, you will have an aggregate-like rule that will decide which outputs are required. It could be rule d, or it may end up being rule all. Either way, the aggregate rule needs an input function that invokes the checkpoint to determine which files are present. If you follow along the example, you would have something like:
checkpoint a: input: file.vcf output: directory('output_dir') shell:""" mkdir {output} # then put all the output files here! .... make unknown number of output files (x) x_1 , x_2, ..., x_n """ #run a separate job from each output of rule a rule b: input: output_dir/x_{n} output: y_{n} shell:""" some of the x inputs will output their corresponding y, but others will have no output """ #run a separate job for each output of rule b rule c: input: y_{n} output: z_{n} # input function for the rule aggregate def aggregate_input(wildcards): checkpoint_output = checkpoints.a.get(**wildcards).output[0] return expand("z_{i}", i=glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i) rule aggregate: # what do you do with all the z files? could be all input: aggregate_input
If you think of your workflow like a tree, rule a is branching with a variable number of branches. Rules b and c are one to one mappings. Aggregate brings all the branches back together AND is responsible for checking how many branches are present. Rules b and c only see one input/output and don’t care how many other branches there are.
EDIT to answer the question in comment and fixed several bugs in my code:
I still get confused here though, because rule b will not have as many outputs as inputs, so won’t rule aggregate never run until all of the wildcards from the output of checkpoint a are present in z_{n}, which they never would be?
This is confusing because it’s not how snakemake usually works and leads to a lot of questions on SO. What you need to remember is that when checkpoints.<rule>.get
is run, the evaluation for that step effectively pauses. Consider the simple case of three values for i == [1, 2, 3]
but only i == 2 and 3
are created in checkpoint a
. We know the DAG will look like this:
rule file input file.vcf / | a x_2 x_3 | | b y_2 y_3 | | c z_2 z_3 / aggregate OUTPUT
Where x_1
is missing from checkpoint a
. But, snakemake doesn’t know how checkpoint a
will behave, just that it will make a directory as output and (because it is a checkpoint) once it is completed the DAG will be reevaluated. So if you ran snakemake -nq
you would see checkpoint a
and aggregate
will run, but no mention of b
or c
. At that point, those are the only rules snakemake knows about and plans to run. Calling checkpoint.<rule>.get
basically says “wait here, after this rule you will have to see what is made”.
So when snakemake first starts running your workflow, the DAG looks like this:
rule file input file.vcf | a ... | ???? ... | aggregate OUTPUT
Snakemake doesn’t know what goes between rule a
and aggregate
, just that it needs to run a
before it can tell.
rule file input file.vcf / | a x_2 x_3 ???? ... | aggregate OUTPUT
Checkpoint a
gets scheduled, run, and now the DAG is reevaluated. The rest of aggregate_input
looks at the files that are present with glob_wildcards
then uses that information to decide which files it needs. Note that the expand is requesting outputs from rule c
, which requires rule b
, which requires x_{n}
, which exist now that the checkpoint has run. Now snakemake can construct the DAG you expect.
Here’s the input function with more comments to hopefully make it clear:
def aggregate_input(wildcards): # say this rule depends on a checkpoint. DAG evaulation pauses here checkpoint_output = checkpoints.a.get(**wildcards).output[0] # at this point, checkpoint a has completed and the output (directory) # is in checkpoint_output. Some number of files are there # use glob_wildcards to find the x_{i} files that actually exist found_files = glob_wildcards(os.path.join(checkpoint_output, "x_{i}.txt")).i # now we know we need all the z files to be created *if* a x file exists. return expand("z_{i}", i=found_files)