I have a bash script that works fine on terminal. It takes a text file from user, reads it line by line, and does operation by using entry in each line as search query.
#!/bin/bash
mkdir "final_loc_tf"
while read BED;
do
cat results/intersect/*/$BED | sortBed | bedtools merge -c 4 -o collapse > "final_loc_tf/"$BED"_final.bed"
done < $1
I want to implement this into my nextflow but no success. I have seen couple of examples parsing csv files but couldn't make it work on my text file.
Here is my nextflow file trying to implement the bash file. But I am not able to parse the text file, as it doesn't give me any output files.
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
params.bed = file("results/all_bed.txt")
process MERGE {
publishDir 'results/final', mode: 'copy', overwrite: false
input:
file file_list
output:
path '*'
script:
"""
bash test.sh $file_list
"""
}
workflow {
bed_ch=params.bed
MERGE(bed_ch)
}
Thanks for your help.
Nextflow processes are executed independently and are isolated from each other inside the working directory (i.e. ./work
). If you have a look at your .command.err
inside your process working directory, you'll likely see errors like:
cat: results/intersect/*/<bed>: No such file or directory
This is because the results
directory (containing the BED files beneath it, i.e. results/intersect/*/$BED
) has not been staged into the process working directory. The reason you didn't see an error in your Nextflow output is that, by default, the return value of a pipeline is the exit status of the last command. In your pipeline, the last command (i.e. bedtools merge
) probably completes successfully and returns exit status zero. The option you want to add is called 'pipefail', man bash
:
pipefail
If set, the return value of a pipeline is the value of the last (rightmost) command to exit with a non-zero status, or zero if all commands in the pipeline exit successfully. This option is disabled by default.
So to ensure that your pipeline does not mask errors, you can add the following to your nextflow.config
to have it applied to all processes:
process {
shell = [ '/bin/bash', '-euo', 'pipefail' ]
}
And to ensure that your workflow is portable, you will need some way to stage each of the required input files into the process working directory. A better way to do what you want would be to just pass in the files that you want merged explicitly. One way would be to use the fromFilePairs
factory method, which gives you a group key. We can handle multiple input files by using a wildcard (i.e. *
) to control the name of staged files, for example:
params.bed_files = './path/to/*/*.bed'
params.outdir = './results'
process MERGE {
tag { sample }
publishDir "${params.outdir}/MERGE", mode: 'copy'
input:
tuple val(sample), path('bed_files/*.bed')
output:
tuple val(sample), path("${sample}_final.bed")
"""
cat bed_files/*.bed |
sortBed |
bedtools merge -c 4 -o collapse \\
> "${sample}_final.bed"
"""
}
workflow {
bed_ch = Channel.fromFilePairs( params.bed_files, size: -1 )
MERGE(bed_ch)
}