Search code examples
pythonpandasconcurrent.futures

Retrieve API data into dataframe using multi threading module


I'm using a third-party API to retrieve 10 minute data from a large number of days for different tags. The current data pull can take up to several minutes depending of course of the number of days and number of tags. I'm therefore trying my hand at multi threading which I understand can be useful for heavy IO operations.

The API call goes as follows (I've replaced the actual API name):

import numpy as N 
import requests as r 
import json 
import pandas as pd
from datetime import datetime 
import concurrent.futures

  
class pyGeneric: 
  
    def __init__(self, serverName, apiKey, rootApiUrl='/Generic.Services/api'): 
        """ 
        Initialize a connection to server, and return a pyGeneric server object 
        """ 
        self.baseUrl = serverName + rootApiUrl 
        self.apiKey = apiKey 
        self.bearer = 'Bearer ' + apiKey 
        self.header = {'mediaType':'application/json','Authorization':self.bearer} 
  
    def getRawMeasurementsJson(self, tag, start, end):
        apiQuery = '/measurements/' + tag + '/from/' + start + '/to/' + end + '?format=json' 
        dataresponse = r.get(self.baseUrl+apiQuery, headers=self.header) 
        data = json.loads(dataresponse.text) 
        return data 
                                                               
                                
    def getAggregatesPandas(self, tags, start, end):
        """        
        Return tag(s) in a pandas dataFrame
        """
        df = pd.DataFrame()
        if type(tags) == str:
            tags = [tags]
        for tag in tags:
            tempJson =  self.getRawMeasurementsJson(tag, start, end)
            tempDf = pd.DataFrame(tempJson['timeSeriesList'][0]['timeSeries'])
            name = tempJson['timeSeriesList'][0]['measurementName']
            df['TimeUtc'] = [datetime.fromtimestamp(i/1000) for i in tempDf['t']]
            df['TimeUtc'] = df['TimeUtc'].dt.round('min')
            df[name] = tempDf['v']
        return df
    

gener = pyGeneric('https://api.generic.com', 'auth_keymlkj9789878686')

An example call to the API would be : gener_df = gener.getAggregatesPandas('tag1.10m.SQL', '*-10d', '*')

This works OK for individual tags but for a list this takes longer which is why I've been trying the following:

tags = ['tag1.10m.SQL',
'tag2.10m.SQL',
'tag3.10m.SQL',
'tag4.10m.SQL',
'tag5.10m.SQL',
'tag6.10m.SQL',
'tag7.10m.SQL',
'tag8.10m.SQL',
'tag9.10m.SQL',
'tag10.10m.SQL']

startdate = "*-150d"
enddate = '*'

final_df = pd.DataFrame

with concurrent.futures.ThreadPoolExecutor() as executor:
    args = ((i,startdate, enddate) for i in tags)
    executor.map(lambda p: gener.getAggregatesPandas(*p), args)

However I'm unable to check if the gener.getAggregatesPandas is being properly executed. Ultimately I would like to get the results in a dataframe called final_df but also unsure of how to proceed. I've read in this post that append inside the context manager would lead to quadratic copies of the data frame so ultimately would slow things down.


Solution

  • As I understand correctly your need is to understand if getAggregatesPandas executed properly.

    you can do it like below.

    with concurrent.futures.ThreadPoolExecutor() as executor:
        args = ((i,startdate, enddate) for i in tags)
        results = executor.map(lambda p: gener.getAggregatesPandas(*p), args)
        for result in results:
            final_df.append(result,ignore_index=False)
        #another approach is below
        #for f in concurrent.futures.as_completed(results):
        #     final_df.append(result,ignore_index=False)
    

    REF Video:-video