Search code examples
nextflow

Combining output files from different process and used as input in next process


With reference to the previous asked question "https://stackoverflow.com/questions/75100652/nextflow-dsl2-how-to-combine-outputs-channels-from-multiple-processes-into-in?rq=1)) I have to combine the outputs from previous two processes and used them as an input to next process

process A {
  
  publishDir params.trim, mode : 'copy' 

  input:
  tuple val(pair_id), file(recal_readsbamfile)

  output:
  tuple val(pair_id),file(normal_samplefile), emit : SM_normalbam
  tuple val(pair_id),file(normal_samplefile_bai), emit : SM_normal_bai


  script:
  normal_samplefile = pair_id + '_recal_reads_SM.bam'
  normal_samplefile_bai = pair_id + '_recal_reads_SM.bai'

  """
    
  ./gatk --java-options "-Xmx24G" AddOrReplaceReadGroups --INPUT $recal_readsbamfile --OUTPUT $normal_samplefile --RGLB lib1 --RGPL illumina --RGPU NONE --RGSM NORMAL_$pair_id
  ./gatk --java-options "-Xmx24G" BuildBamIndex --INPUT $normal_samplefile

process B {
  
  publishDir params.trim, mode : 'copy' 

  input:
  tuple val(pair_id), file(recal_readsbamfile)

  output:
  tuple val(pair_id),file(tumor_samplefile), emit : SM_tumorbam
  tuple val(pair_id),file(tumor_samplefile_bai), emit : SM_tumorbam_bai


  script:
  tumor_samplefile = pair_id + '_recal_reads_SM.bam'
  tumor_samplefile_bai = pair_id + '_recal_reads_SM.bai'

  """
    
  ./gatk --java-options "-Xmx24G" AddOrReplaceReadGroups --INPUT $recal_readsbamfile --OUTPUT $tumor_samplefile --RGLB lib1 --RGPL illumina --RGPU NONE --RGSM TUMOR_$pair_id
  ./gatk --java-options "-Xmx24G" BuildBamIndex --INPUT $tumor_samplefile
  
process M {
 
    publishDir params.trim, mode : 'copy'

  input:
  tuple val(pair_id), file(tumor_samplefile),  file(normal_samplefile)

  output:
  tuple val(pair_id),file(flr2_file), emit : flr
  tuple val(pair_id),file(pon_file),  emit : pons

 script:

 flr2_file = pair_id + '_f1r2.tar.gz'
 pon_file =  pair_id + '_somatic_variants_PON.vcf' 

 """
./gatk --java-options "-Xmx24G"  Mutect2 --native-pair-hmm-threads 20 -R ${params.reference} ${tumor_samplefile} ${normal_samplefile --normal-sample ${normalRGSM} -pon ${params.ponsfile} -germline-resource ${params.germlineresource} -L ${params.intervalfile} --f1r2-tar-gz  ${name}.f1r2.tar.gz --output ${name}.mutect2.unfiltered.vcf

process A and process B have almost similar named output varies in a way that

  1. process A output - patient_1_T_recal_reads.bam
  2. process B output -patient_1_N_recal_reads.bam

i have to include both the files together in next process(M) in a way that -I patient_1_T_recal_reads.bam -I patient_1_N_recal_reads.bam -normal patient_1_N

i have followed the previous asked similar question but got stuck with new channel creation as i have multiple samples so is there any other way to proceed instead of giving path for every file.


Solution

  • Usually when you want to combine the items emitted from two channels, you're looking for the cartesian product. For this, you would want the combine operator. However, your first two processes are almost identical and could be refactored into a single definition which could then be included from another workflow script (using the include keyword). Unless you really need to prefix your sample names, I would just use them as-is and supply the normal sample when running Mutect2. For example:

    params.bam_files = './bam_files/*_recal.bam'
    params.outdir = './results'
    
    params.reference = './igenomes/Homo_sapiens/GATK/GRCh38/Sequence/WholeGenomeFasta/Homo_sapiens_assembly38.fasta'
    params.sequence_dict = './igenomes/Homo_sapiens/GATK/GRCh38/Sequence/WholeGenomeFasta/Homo_sapiens_assembly38.dict'
    params.germline_resource = './igenomes/Homo_sapiens/GATK/GRCh38/Annotation/GermlineResource/gnomAD.r2.1.1.GRCh38.PASS.AC.AF.only.vcf.gz'
    params.pon ='./igenomes/Homo_sapiens/GATK/GRCh38/Annotation/GATKBundle/1000g_pon.hg38.vcf.gz'
    params.intervals = './intervals.bed'
    
    process AddOrReplaceReadGroups {
    
        tag { sample }
    
        publishDir "${params.outdir}/AddOrReplaceReadGroups", mode: 'copy'
    
        input:
        tuple val(sample), path(bam)
        val library
        val platform
        val barcode
    
        output:
        tuple val(sample), path("${bam.baseName}_SM.bam{,.bai}")
    
        script:
        def avail_mem = task.memory ? "${task.memory.toGiga()}G" : "1G"
        def java_opts = "-XX:+UseSerialGC -Xmx${avail_mem}"
    
        """
        gatk --java-options "${java_opts}" AddOrReplaceReadGroups \\
            --INPUT "${bam}" \\
            --OUTPUT "${bam.baseName}_SM.bam" \\
            --RGLB "${library}" \\
            --RGPL "${platform}" \\
            --RGPU "${barcode}" \\
            --RGSM "${sample}"
        gatk --java-options "${java_opts}" BuildBamIndex \\
            --INPUT "${bam.baseName}_SM.bam" \\
            --OUTPUT "${bam.baseName}_SM.bam.bai"
        """
    }
    
    process Mutect2 {
    
        tag { pair_id }
    
        publishDir "${params.outdir}/Mutect2", mode: 'copy'
    
        input:
        tuple(
            val(pair_id),
            val(tumor_sample), path(indexed_tumor_bam),
            val(normal_sample), path(indexed_normal_bam),
        )
        path indexed_fasta
        path sequence_dict
        path germline_resource
        path pon
        path intervals
    
        output:
        tuple val(pair_id), path("${pair_id}.mutect2.vcf{,.idx}"), emit: vcf
        tuple val(pair_id), path("${pair_id}.f1r2.tar.gz"), emit: f1r2
    
        script:
        def avail_mem = task.memory ? "${task.memory.toGiga()}G" : "1G"
        def java_opts = "-XX:+UseSerialGC -Xmx${avail_mem}"
    
        """
        gatk --java-options "${java_opts}" Mutect2 \\
            --input "${indexed_tumor_bam.first()}" \\
            --input "${indexed_normal_bam.first()}" \\
            --output "${pair_id}.mutect2.vcf" \\
            --reference "${indexed_fasta.first()}" \\
            --sequence-dictionary "${sequence_dict}" \\
            --native-pair-hmm-threads ${task.cpus} \\
            --normal-sample "${normal_sample}" \\
            --panel-of-normals "${pon.first()}" \\
            --germline-resource "${germline_resource.first()}" \\
            --intervals "${intervals}" \\
            --f1r2-tar-gz "${pair_id}.f1r2.tar.gz"
        """
    }
    

    As you can see, my preference is to keep the BAM and index files together in the same channel. I also prefer basing my output files off of a file, rather than some string. This approach almost always guarantees a unique output filename so we don't accidentally clobber our inputs. An example workflow might looks like:

    workflow {
    
        Channel
            .fromFilePairs( params.bam_files, size: 1, flat: true )
            .tap { bam_files }
            .map { sample, bam ->
                pair_id = sample.substring(0, sample.lastIndexOf('_'))
    
                tuple( pair_id, sample )
            }
            .set { samples }
    
        reference = file( "${params.reference}{,.fai}" )
        sequence_dict = file( params.sequence_dict )
        germline_resource = file( "${params.germline_resource}{,.tbi}" )
        pon = file( "${params.pon}{,.tbi}" )
        intervals = file( params.intervals )
    
        AddOrReplaceReadGroups( bam_files, 'lib1', 'illumina', 'NONE' )
    
        samples
            .map { pair_id, sample -> tuple( sample, pair_id ) }
            .join( AddOrReplaceReadGroups.out )
            .branch { sample, pair_id, indexed_bam ->
    
                tumor: sample.endsWith('_T')
                    return tuple( pair_id, sample, indexed_bam )
    
                normal: sample.endsWith('_N')
                    return tuple( pair_id, sample, indexed_bam )
    
                other: true
            }
            .set { samples }
    
        Mutect2(
            samples.tumor.join( samples.normal ),
            reference,
            sequence_dict,
            germline_resource,
            pon,
            intervals,
        )
    }