Search code examples
groovynextflow

Concatenate group-wise in a non-blocking way in Nextflow


I want to split a nextflow workflow into separate chunks (vehicle types), then split these chunks even further (vehicles) and then report as soon as all workers for a specific vehicle type are done.

enter image description here

Currently nextflow main.nf results in:

 N E X T F L O W   ~  version 24.04.2

Launching `nextflow_pipeline/main.nf` [amazing_bell] DSL2 - revision: aae9cee085

executor >  local (11)
[6e/28da8a] start     | 1 of 1 ✔
[36/743eea] step1 (2) | 2 of 2 ✔
[cd/50083f] step2 (6) | 6 of 6 ✔
[5f/a823c3] step3 (1) | 2 of 2 ✔
step2: boats1

step2: boats2

step2: cars2

step2: cars3

step2: cars1

step2: cars4

all cars are done

all boats are done

What I would like instead is:

 N E X T F L O W   ~  version 24.04.2

Launching `nextflow_pipeline/main.nf` [amazing_bell] DSL2 - revision: aae9cee085

executor >  local (11)
[6e/28da8a] start     | 1 of 1 ✔
[36/743eea] step1 (2) | 2 of 2 ✔
[cd/50083f] step2 (6) | 6 of 6 ✔
[5f/a823c3] step3 (1) | 2 of 2 ✔
step2: boats1

step2: boats2

all boats are done

step2: cars2

step2: cars3

step2: cars1

step2: cars4

all cars are done

Minimal reproducible example:

process  start {
    output:
        path "vehicle_types.list", emit: vehicle_type

    """
    #!/usr/bin/python3

with open('vehicle_types.list', 'w') as fout:
    for vehicle in ['boats', 'cars']:
        fout.write(vehicle + "\\n")
    """
}

process  step1 {
    debug true

    input:
        val vehicle_type

    output:
        path "vehicles.list", emit: vehicles

    """
    #!/usr/bin/python3

import time
if "${vehicle_type}" == 'boats':
    n = 2
else:
    n = 4
with open('vehicles.list', 'w') as fout:
    for i in range(n):
        time.sleep(2)
        fout.write("${vehicle_type}" + str(i + 1) + "\\n")
    """
}

process  step2 {
    debug true

    input:
        val vehicle

    output:
        path "not_needed.txt", emit: hacky_output

    """
    echo step2: ${vehicle}
    echo hello > not_needed.txt
    """
}

process  step3 {
    debug true

    input:
        val vehicle_type
        val hacky_variable_to_ensure_step2_completed

    """
    echo "all ${vehicle_type} are done"
    """
}

workflow {
    main:
        start()
        start.out.vehicle_type.splitText().map{it -> it.trim()}.set{vehicle_type}
        step1(vehicle_type)
        step1.out.vehicles.splitText().map{it -> it.trim()}.set{vehicles}
        step2(vehicles)
        step3(vehicle_type, step2.out.hacky_output.last())
}

Currently step3 is being blocked by the .last(). I want to concat the results from step2 in a non-blocking way as soon as all the workers for a specific vehicle type are done instead of waiting till all workers for step2 are done. How do I achieve this?


Solution

  • You can use the built-in groupKey() function to define the expected size of each group. Then call with the groupTuple operator, for example:

    process step1 {
        tag "$vehicle_type"
        debug true
    
        input:
        val vehicle_type
    
        output:
        tuple val(vehicle_type), path("vehicles.list"), emit: vehicles
    
        """
        #!/usr/bin/python3
        import time
    
        n = 2 if "${vehicle_type}" == 'boats' else 4
        with open('vehicles.list', 'w') as fout:
            for i in range(n):
                time.sleep(2)
                fout.write("${vehicle_type}" + str(i + 1) + "\\n")
        """
    }
    
    process step2 {
        tag "$vehicle_type"
        debug true
    
        input:
        tuple val(vehicle_type), val(vehicle)
    
        output:
        tuple val(vehicle_type), path("not_needed.txt"), emit: hacky_output
    
        """
        echo "step2: ${vehicle}" && touch not_needed.txt
        """
    }
    
    process step3 {
        tag "$vehicle_type"
        debug true
    
        input:
        tuple val(vehicle_type), path("txt")
    
        """
        echo "all ${vehicle_type} are done"
        """
    }
    
    workflow {
    
        vehicle_types = Channel.of('boats', 'cars')
    
        step1( vehicle_types )
    
        step1.out.vehicles
            .map { vehicle_type, vehicle_list ->
                def vehicles = vehicle_list.readLines()
                def group_key =  groupKey( vehicle_type, vehicles.size() )
    
                tuple( group_key, vehicles )
            }
            .transpose()
            .set { vehicles }
    
        step2( vehicles )
        
        step3( step2.out.hacky_output.groupTuple() )
    }
    

    Results:

    $ nextflow run main.nf 
    
     N E X T F L O W   ~  version 24.04.3
    
    Launching `main.nf` [modest_nightingale] DSL2 - revision: 49168943ed
    
    executor >  local (10)
    [b1/dae30f] step1 (cars)  [100%] 2 of 2 ✔
    [4a/018f10] step2 (cars)  [100%] 6 of 6 ✔
    [eb/bab138] step3 (cars)  [100%] 2 of 2 ✔
    step2: boats2
    
    step2: boats1
    
    all boats are done
    
    step2: cars1
    
    step2: cars2
    
    step2: cars3
    
    step2: cars4
    
    all cars are done