Search code examples
nextflow

How to obtain files from a directory path channel in nextflow


I basically have a process that outputs a directory, and then I want to pass all the files in that directory into blast, how can I do that?

The error is in how I am passing information throught the workflow, namely in the construction of ch_forBlast:

workflow {
    ...
    ch_consensus = CONSENSUS_BUILDING(ch_trimmed.fastq)
    ch_forBlast = Channel.fromPath("${ch_consensus.consensusDir}/*.fasta")
    ch_blast = BLAST(ch_forBlast, db_dir)
}

But I just get an empty file or nothing, I think I just don't understand very well how to go from a path channel to the files in it, and I haven't been able to find out how to do that. Any insight?

These are the relevant parts of the workflow...any suggestions appreciated. I edited a bit based on the very helpful comments from @dthorbur:


process CONSENSUS_BUILDING {
//THIS WORKS FINE
    publishDir "${params.outdir}/consensus", mode: 'copy'

    input:
    path fastq

    output:
    path "${fastq.getSimpleName()}_ID/*.fasta", emit: consensus_fastas

    script:
    """
    gunzip -f ${fastq}
    NGSpeciesID --ont --sample_size 10000 --consensus --racon --racon_iter 3 --fastq ${fastq.getSimpleName()}.fastq --m 650 --s 50 --outfolder ${fastq.getSimpleName()}_ID
    """
}

process BLAST {
//THIS WORKS FINE IN ISOLATION
    publishDir "${params.outdir}/blast", mode: 'copy'
    container  'docker://ncbi/blast'

    input:
    path fasta
    path db

    output:
    path "${fasta.getParent()}_${fasta.getSimpleName()}.blast"

    script:
    """                                                                                                                            
    blastn -db $db/$db_name -query $fasta -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send evalue b\
itscore' -out ${fasta.getParent()}_${fasta.getSimpleName()}.blast                                                                  
    """
}

// Overall workflow
workflow {
    ...
    ch_consensus = CONSENSUS_BUILDING(ch_trimmed.fastq)
    ch_forBlast = Channel.fromPath("${ch_consensus.consensusDir}/*.fasta")
    ch_blast = BLAST(CONSENSUS_BUILDING.out.consensus_fastas, db_dir)
}

But I am getting:

error [nextflow.exception.ProcessUnrecoverableException]: Process BLAST (1) script contains error(s) Nov-17 11:01:12.310 [Actor Thread 9] ERROR nextflow.processor.TaskProcessor - Error executing process > 'BLAST (1)'

Caused by: No signature of method: nextflow.util.BlankSeparatedList.getParent() is applicable for argument types: () values: [] Possible solutions: getTarget(), setTarget(java.util.List), getAt(int), getAt(groovy.lang.EmptyRange), getAt(java.lang.N
umber), getAt(groovy.lang.Range) -- Check script 'bleePipelineBlast.nf' at line: 114

Source block: """ blastn -db $db/$db_name -query $fasta -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart s
end evalue bitscore' -out ${fasta.getParent()}_${fasta.getSimpleName()}.blast """

Tip: view the complete command output by changing to the process work dir and entering the command cat .command.out Nov-17 11:01:12.312 [Actor Thread 9] DEBUG nextflow.Session - Session aborted -- Cause: Process BLAST (1) script conta
ins error(s)


Solution

  • Hard to say without seeing the nextflow.log file, but my guess is the workflow initially tried to look in "${ch_consensus.consensusDir}/*.fasta" at the start of the workflow and finds nothing since there is nothing telling it to wait for CONSENSUS_BUILDING to finish.

    I suspect you can fix this with a more appropriate output declaration. I'm unfamiliar with NGSpeciesID, but I'm going to assume the output is a directory with consensus fastas. You can adjust the process from there.

    EDIT: Adjusting the script based on the comment from OP.

    process CONSENSUS_BUILDING {
        publishDir "${params.outdir}/consensus", mode: 'copy'
    
        input:
        path fastq
    
        output:
        path "${fastq.getSimpleName()}.fastq", emit: fastq // Are you emitting the same fastq you used as input? You can use queue channels multiple times with DSL2, so this may be unnecessary. 
        path "${fastq.getSimpleName()}_ID/*.fasta", emit: consensus_fastas
    
        script:
        """
        gunzip -f ${fastq}
        NGSpeciesID --ont --sample_size 10000 --consensus --racon --racon_iter 3 --fastq ${fastq.getSimpleName()}.fastq --m 650 --s 50 --outfolder ${fastq.getSimpleName()}_ID
        """
    }
    
    process BLAST {
        publishDir "${params.outdir}/blast", mode: 'copy'
        container  'docker://ncbi/blast'
    
        input:
        tuple path(sampleID), path(fasta)
        path db
    
        output:
        path "${sampleID}.blast"
    
        script:
        """                                                                                                                            
        blastn -db $db/$db_name -query $fasta -outfmt '6 sseqid stitle pident length mismatch gapopen qstart qend sstart send evalue b\
    itscore' -out ${sampleID}.blast                                                                  
        """
    }
    
    
    workflow {
        ...
        CONSENSUS_BUILDING(ch_trimmed.fastq)
        CONSENSUS_BUILDING
          .out
          .consensus_fastas
          .map{ it -> tuple(it.simpleName, it) }
          .set{ blast_in }
    
        BLAST(blast_in , db_dir)
    }