Search code examples
azureazure-devopsazure-pipelinesazure-repos

Azure child pipelines across repositories with responses


I have multiple repos thanks to a micro-frontend architecture and would like to run the test suite found in another repo on a specified environment/url.

My parent pipeline creates and environment for a PR preview. Rather than checkout the other repos, I was wondering if it possible to call the child repos pipeline?

I have looked at Trigger one pipeline after another but it doesn't appear to fail a parent pipeline, should a child pipeline fail.

For example:

  • parent repo pipelines build pr env
  • 1a. child repo 1 pipeline tests env x and passes
  • 1b. [child repo 2 pipeline tests env x and fails
  • 2 parent repo pipeline fails

The hope is that, if we can do this, the test suite and tooling can be managed by the child app teams, without the parent having to support specific frameworks, allowing independence.


Solution

  • Only way I can see this working is using custom script task in parent pipeline to call rest API to queue the child pipeline and wait for the result.

    Example python script

    import requests
    import time
    import sys
    
    # Azure DevOps organization URL
    org_url = 'https://dev.azure.com/your_org'
    
    # Azure DevOps project name
    project_name = 'your_project'
    
    # Azure DevOps pipeline ID
    pipeline_id = 'your_pipeline_id'
    
    # Azure DevOps personal access token with access to pipelines
    pat = 'your_personal_access_token' # System_AccessToken can be passed from pipeline to use the PAT from the build service account 
    
    # Queue the pipeline
    queue_url = f'{org_url}/{project_name}/_apis/pipelines/{pipeline_id}/runs?api-version=6.0-preview.1'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {pat}'
    }
    response = requests.post(queue_url, headers=headers)
    if response.status_code != 200:
        print(f'Error queuing pipeline: {response.text}')
        sys.exit(1)
    
    # Get the pipeline run ID
    run_id = response.json()['id']
    print(f'Queued pipeline run {run_id}')
    
    # Wait for the pipeline to complete
    status_url = f'{org_url}/{project_name}/_apis/pipelines/runs/{run_id}?api-version=6.0-preview.1'
    while True:
        response = requests.get(status_url, headers=headers)
        if response.status_code != 200:
            print(f'Error getting pipeline status: {response.text}')
            sys.exit(1)
    
        result = response.json()['result']
        if result is not None:
            break
    
        print('Pipeline still running, checking again in 10 seconds')
        time.sleep(10)
    
    # Check the result of the pipeline
    if result == 'succeeded':
        print('Pipeline run succeeded')
        sys.exit(0)
    else:
        print('Pipeline run failed')
        sys.exit(1)