Search code examples
kubernetesworkflowdirected-acyclic-graphsargo-workflows

Dynamic "Fan In" in Argo Workflows


Argo permits the dynamic generation of parallel workflow steps based on outputs from previous steps.

An example of this dynamic workflow generation is provided here: https://github.com/argoproj/argo-workflows/blob/master/examples/loops-param-result.yaml

I'm trying to create a similar workflow with a final 'fan-in' step that will read outputs from the dynamically created parallel steps. Here's a stab at it:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-param-result-
spec:
  entrypoint: loop-param-result-example
  templates:
  - name: loop-param-result-example
    steps:
    - - name: generate
        template: gen-number-list
    - - name: write
        template: output-number
        arguments:
          parameters:
          - name: number
            value: "{{item}}"
        withParam: "{{steps.generate.outputs.result}}"
    - - name: fan-in
        template: fan-in
        arguments:
          parameters:
          - name: numbers
            value: "{{steps.write.outputs.parameters.number}}"

  - name: gen-number-list
    script:
      image: python:alpine3.6
      command: [python]
      source: |
        import json
        import sys
        json.dump([i for i in range(20, 31)], sys.stdout)

  - name: output-number
    inputs:
      parameters:
      - name: number
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
    outputs:
      parameters:
        - name: number
          valueFrom:
            path: /tmp/number.txt

  - name: fan-in
    inputs:
      parameters:
        - name: numbers
    container:
      image: alpine:latest
      command: [sh, -c]
      args: ["echo received {{inputs.parameters.numbers}}"]

I'm able to submit this workflow, and it runs successfully. Unfortunately, the output of the final fan-in step looks like this:

fan-in: received {{steps.write.outputs.parameters.number}}

The value for the input numbers parameter is not being interpolated. Any ideas about how to get this working?


Solution

  • Aggregated step output parameters are accessible via steps.STEP-NAME.outputs.parameters. It's not possible to access an aggregated set of outputs for one parameter by name.

    This slight change to your workflow should get you what you need:

    apiVersion: argoproj.io/v1alpha1
    kind: Workflow
    metadata:
      generateName: loops-param-result-
    spec:
      entrypoint: loop-param-result-example
      templates:
      - name: loop-param-result-example
        steps:
        - - name: generate
            template: gen-number-list
        - - name: write
            template: output-number
            arguments:
              parameters:
              - name: number
                value: "{{item}}"
            withParam: "{{steps.generate.outputs.result}}"
        - - name: fan-in
            template: fan-in
            arguments:
              parameters:
              - name: numbers
                value: "{{steps.write.outputs.parameters}}"
    
      - name: gen-number-list
        script:
          image: python:alpine3.6
          command: [python]
          source: |
            import json
            import sys
            json.dump([i for i in range(20, 31)], sys.stdout)
    
      - name: output-number
        inputs:
          parameters:
          - name: number
        container:
          image: alpine:latest
          command: [sh, -c]
          args: ["echo {{inputs.parameters.number}} > /tmp/number.txt"]
        outputs:
          parameters:
            - name: number
              valueFrom:
                path: /tmp/number.txt
    
      - name: fan-in
        inputs:
          parameters:
            - name: numbers
        container:
          image: alpine:latest
          command: [sh, -c]
          args: ["echo received {{inputs.parameters.numbers}}"]
    

    The only change was to remove .number from {{steps.write.outputs.parameters.number}}.

    This is the new output:

    received [{number:20},{number:21},{number:22},{number:23},{number:24},{number:25},{number:26},{number:27},{number:28},{number:29},{number:30}]
    

    Here is the GitHub issue where output parameter aggregation was discussed/created.

    I've put in an enhancement proposal for accessing aggregated output parameters by name.