Following example from Apache Beam Pipeline to read from REST API runs locally but not on Dataflow pipeline requests data from api with
response = requests.get(url, auth=HTTPDigestAuth(self.USER, self.PASSWORD), headers=headers)
where url string
url = "https://host:port/car('power%203')/speed"
Pipeline fails with error, notice extra \
around 'power%203
:
InvalidSchema: No connection adapters were found for '(("https://host:post/car(\'power%203\')/speed",),)' [while running 'fetch API data']
Idea is to develop and test pipelines locally and then run production on gcp dataflow. Request works outside pipeline, but fails inside Python Apache Beam pipeline. Pipeline executed on DirectRunner
from WSL2 Ubuntu conda pyhton 3.9 environment or cloud jupyter hub still returns same error. Please find full pipeline example below:
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import requests
import json
from requests.auth import HTTPDigestAuth
class get_api_data(beam.DoFn):
def __init__(self, url):
self.url = url,
self.USER = 'user'
self.PASSWORD = 'password'
def process(self, buffer=[]):
logging.info(self.url)
headers = {
'Prefer': f'data.maxpagesize=2000',
}
response = requests.get(self.url, auth=HTTPDigestAuth(self.USER, self.PASSWORD), headers=headers)
buffer = response.json()['value']
return buffer
class Split(beam.DoFn):
def process(self, element):
try:
etag = element['etag']
car_id = element['carID']
power = element['power']
speed = element['speed']
except ValueError as e:
logging.error(e)
return [{
'etag': str(etag),
'car_id': str(car_id),
'power': int(power),
'speed': float(speed),
}]
def run(argv=None):
url = "https://host:port/car('power%203')/speed"
p1 = beam.Pipeline(options=pipeline_options)
ingest_data = (
p1
| 'Start Pipeline' >> beam.Create([None])
| 'fetch API data' >> beam.ParDo(get_api_data(url))
| 'split records' >> beam.ParDo(Split())
| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
)
result = p1.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
It got me really confused and I would be grateful if someone could share any suggestions or comments on why url string got distorted.
Remove the comma next to url in get_api_data
class - it should fix the problem
class get_api_data(beam.DoFn):
def __init__(self, url):
self.url = url
self.USER = 'user'
self.PASSWORD = 'password'