Search code examples
pythonpython-requestsgoogle-cloud-dataflowapache-beam

Python Apache Beam error "InvalidSchema: No connection adapters were found for" when request api url with spaces


Following example from Apache Beam Pipeline to read from REST API runs locally but not on Dataflow pipeline requests data from api with

response = requests.get(url, auth=HTTPDigestAuth(self.USER, self.PASSWORD), headers=headers)

where url string

url = "https://host:port/car('power%203')/speed"

Pipeline fails with error, notice extra \ around 'power%203:

InvalidSchema: No connection adapters were found for '(("https://host:post/car(\'power%203\')/speed",),)' [while running 'fetch API data']

Idea is to develop and test pipelines locally and then run production on gcp dataflow. Request works outside pipeline, but fails inside Python Apache Beam pipeline. Pipeline executed on DirectRunner from WSL2 Ubuntu conda pyhton 3.9 environment or cloud jupyter hub still returns same error. Please find full pipeline example below:

import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import requests
import json
from requests.auth import HTTPDigestAuth

class get_api_data(beam.DoFn):
    def __init__(self, url):
        self.url = url,
        self.USER = 'user' 
        self.PASSWORD = 'password'

    def process(self, buffer=[]):        
        logging.info(self.url)
        headers = {
            'Prefer': f'data.maxpagesize=2000',
        }        
        response = requests.get(self.url, auth=HTTPDigestAuth(self.USER, self.PASSWORD), headers=headers)
        buffer = response.json()['value']
        return buffer


class Split(beam.DoFn):
    def process(self, element):
        try:
            etag = element['etag']
            car_id = element['carID']
            power = element['power']
            speed = element['speed']
        except ValueError as e:
            logging.error(e)

        return [{
            'etag': str(etag),
            'car_id': str(car_id),
            'power': int(power),
            'speed': float(speed),
        }]

def run(argv=None):   
    url = "https://host:port/car('power%203')/speed"
    p1 = beam.Pipeline(options=pipeline_options)
    ingest_data = (
        p1
        | 'Start Pipeline' >> beam.Create([None])
        | 'fetch API data' >> beam.ParDo(get_api_data(url)) 
        | 'split records' >> beam.ParDo(Split())
        | 'write to text' >> beam.io.WriteToText("./test_v2.csv")
    )

    result = p1.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

It got me really confused and I would be grateful if someone could share any suggestions or comments on why url string got distorted.


Solution

  • Remove the comma next to url in get_api_data class - it should fix the problem

    class get_api_data(beam.DoFn):
        def __init__(self, url):
            self.url = url
            self.USER = 'user' 
            self.PASSWORD = 'password'