Search code examples
pythonpython-requestspython-asynciogoogle-cloud-dataflowapache-beam

asynchronous API calls in apache beam


As the title says, I want to make asynchronous API calls in apache beam using python.

Currently, I am calling the API inside a DoFn for each element in the Pcollection.

DoFn code

class textapi_call(beam.DoFn):
    def __init__(self, api_key):
        self.api_key = api_key


    def setup(self):
        self.session = requests.session()

    def process(self, element):
        address = element[3] + ", " + element[4] + ", " + element[5] + ", " + element[6] + ", " + element[7]
        url = findplace_url(address, api_key=self.api_key)
        params = {"inputtype": "textquery",
                  "fields": "name,place_id,geometry,type,icon,permanently_closed,business_status"}
        start = time.time()
        res = self.session.get(url, params=params)
        results = json.loads(res.content)
        time_taken = time.time() - start

        return [[element[0], address, str(results), time_taken]]

pipeline code:

with beam.Pipeline(options=pipeline_options) as p:
    lines = p | ReadFromText(input_file,skip_header_lines=1)
    lines_list = lines | "to list" >> beam.Map(parse_csv)
    res = lines_list | "API calls" >> beam.ParDo(textapi_call(api_key))

How can I modify the code to make the API calls asynchronous or concurrent? I could not find any examples relating to this in python.

I want to mainly improve the performance. Please let me know if there is another way to make the API calls faster in beam apart from horizontal scaling.


Solution

  • For something like this, where you're spending most of your time waiting for external services, there are a couple of ways to speed things up.

    The easiest would be to use something like BatchElements followed by a DoFn that would process a whole batch in parallel. This works best when the service in question has a batch API, but one can do it manually as well, e.g.

    class ProcessBatchDoFn(beam.DoFo):
      def setup(self):
        self.session = requests.session()
        self.executor = concurrent.futures.Executor(...)
    
      def process(batch_of_elements):
        urls = [element_to_url(e) for e in batch_of_elements]
        for element, result in zip(
            batch_of_elements,
            self.executor.map(self.session.get, urls))
            yield element[0], url, str(result)  # or whatever is needed downstream
    

    used as

    with beam.Pipeline(options=pipeline_options) as p:
        lines = p | ReadFromText(input_file,skip_header_lines=1)
        lines_list = lines | "to list" >> beam.Map(parse_csv)
        res = lines_list | beam.BatchElements() |  beam.ParDo(ProcessBatchDoFn())
    

    Maybe using something like https://github.com/ross/requests-futures could make this simpler.

    This will work well as long as the URLs all take about the same amount of time to fetch. Anther option would be to do rolling requests, e.g.

    def ExpensivePerElementDoFn(beam.DoFn):
      def setup(self):
        self.executor = concurrent.futures.Executor(...)
    
      def start_bundle(self):
        self.pending = []
    
      def process(element):
        self.pending.append(self.executor.submit(fn, element))
        yield from self.flush_done()
        while len(self.pending) > MAX_CONCURRENT_REQUESTS:
          time.sleep(0.1)
          yield from self.flush_done()
    
      def flush_done(self):
        done = [ix for ix, future in enumerate(self.pending) if future.done()]
        for ix in reversed(done):
          future = self.pending.pop(ix)
          yield future.result()
    
      def finish_bundle(self):
        for future in self.pending():
          yield future.result()