I'm new to nextflow, channel, process. Kindly bear with me. I'd like to create a pipeline, pass paired reads from one to another process. I am unable to do it.
script1.nf
params.outdir = 'results'
params.reads1 = "/Users/user/Downloads/tiny/normal/*_R1_xxx.fastq.gz"
params.reads2 = "/Users/user/Downloads/tiny/normal/*_R2_xxx.fastq.gz"
println "reads: $params.reads1"
println "reads: $params.reads2"
Channel.fromPath(params.reads1,checkIfExists: true).view()
Channel.fromPath(params.reads2,checkIfExists: true).view()
include { fastp } from '/Users/name/Documents/name/nextflow_scripts/fastp.nf'
fqpairs_ch=channel.fromFilePairs('/Users/name/Downloads/tiny/normal/*_R{1,2}_xxx.fastq.gz')
process check {
output:
stdout
"""
echo "hello from command line \n"
"""
}
workflow {
check().view(). /// this works
fastp(fqpairs_ch).view() // it gives error
}
fastp.nf
params.outdir = "./results"
process fastp() {
tag "$sample_id"
input:
tuple val(sample_id), file(x), file(x1) from fqpairs_ch
output:
println ("$sample_id")
script:
"""
mkdir fastp_trimmed
fastp \
--correction \
--cut_tail \
--disable_trim_poly_g \
--length_required 50 \
--qualified_quality_phred 20 \
--thread 12 \
--trim_poly_x \
--unqualified_percent_limit 20 \
-i ${x[0]} -I ${x[1]} \
-o fastp_trimmed/joinedfiles.dat_${x[0]} -O fastp_trimmed/joinedfiles.dat_${x[1]}
"""
}
Error:
ERROR ~ No such variable: fqpairs_ch
-- Check script 'fastp.nf' at line: 13 or see '.nextflow.log' file for more details
I'm unable to wrap by head around tag, val. Also, it is difficult for me to understand how to pass variable from one script to another. I've watched Youtube tutorial but with limited success.
The include
keyword can be used to import functions, processes and other workflows into your main workflow script1. Note that when using the new DSL 2, the from
and into
channel declarations can be ommitted2. Here's one way, tested using Conda.
Contents of main.nf
:
params.reads = '/Users/name/Downloads/tiny/normal/*_R{1,2}_xxx.fastq.gz'
include { fastp } from './modules/fastp'
workflow {
read_pairs = Channel.fromFilePairs( params.reads )
fastp( read_pairs )
fastp.out.reads.view()
}
Contents of modules/fastp/main.nf
:
process fastp {
tag { sample_id }
input:
tuple val(sample_id), path(reads, stageAs: 'input_reads/*')
output:
tuple val(sample_id), path("${sample_id}_{1,2}.trimmed.fastq.gz"), emit: reads
tuple val(sample_id), path("fastp.html"), emit: html
tuple val(sample_id), path("fastp.json"), emit: json
script:
def (fq1, fq2) = reads
"""
fastp \\
--correction \\
--cut_tail \\
--disable_trim_poly_g \\
--length_required 50 \\
--qualified_quality_phred 20 \\
--thread ${task.cpus} \\
--trim_poly_x \\
--unqualified_percent_limit 20 \\
-i "${fq1}" \\
-I "${fq2}" \\
-o "${sample_id}_1.trimmed.fastq.gz" \\
-O "${sample_id}_2.trimmed.fastq.gz"
"""
}
Contents of nextflow.config
:
params {
outdir = './results'
}
process {
withName: fastp {
publishDir = [
[
path: "${params.outdir}/fastp/reads",
pattern: "*.fastq.gz",
mode: 'copy',
],
[
path: "${params.outdir}/fastp/reports",
pattern: "fastp.{html,json}",
mode: 'copy',
],
]
cpus = 12
conda = 'fastp=0.23.4=hadf994f_1'
}
}
conda {
enabled = true
}
Results:
$ nextflow run main.nf -ansi-log false
N E X T F L O W ~ version 23.04.1
Launching `main.nf` [special_bhabha] DSL2 - revision: 6e2f1c4478
Creating env using conda: fastp=0.23.4=hadf994f_1 [cache /path/to/work/conda/env-069bf10c907949d763dee91ac82cdee5]
[cf/1cbcdb] Submitted process > fastp (baz)
[ae/3f33c5] Submitted process > fastp (bar)
[8a/37deb8] Submitted process > fastp (foo)
[bar, [/path/to/work/ae/3f33c5a956b91d0e61b7c42f059249/bar_1.trimmed.fastq.gz, /path/to/work/ae/3f33c5a956b91d0e61b7c42f059249/bar_2.trimmed.fastq.gz]]
[baz, [/path/to/work/cf/1cbcdbb27ad021bbc3e40fbb4892b1/baz_1.trimmed.fastq.gz, /path/to/work/cf/1cbcdbb27ad021bbc3e40fbb4892b1/baz_2.trimmed.fastq.gz]]
[foo, [/path/to/work/8a/37deb820eb932b790a2806c86b0cd6/foo_1.trimmed.fastq.gz, /path/to/work/8a/37deb820eb932b790a2806c86b0cd6/foo_2.trimmed.fastq.gz]]
I've used the stageAs
input file/path option to ensure we never (accidentally) clobber our input files when fastp
is run. Specifying an output filename that ends with .gz
also ensures we always produce gzip-compressed output3. If we had instead used the original filename as a suffix, we could potentially produce uncompressed output if our input files were also uncompressed (i.e. do not end with .gz
) which might be unexpected.