Search code examples
pythonredispython-asynciofastapiaioredis

Implement concurrent data fetch from Azure Redis cache in python


I am currently working on building low latency model inference API using fast API, we are using azure redis cache standard version for fetching features and onnx model for fast model inference. I am using aioredis to implement concurrency for data read in redis. I am calling two feature request from redis one for userID that fetch single string and other for product that fetches list of strings, this later I convert to list of float using json parsing.

For one request overall its taking 70-80ms but for more than 10 concurrent request the redis is taking more than 400ms to fetch results which is huge and can increase linearly over more concurrent users while load testing.

The code for getting data from redis is:

import numpy as np
import json
from ..Helpers.helper import curt_giver, milsec_calc
import aioredis
r = aioredis.from_url("redis://user:host",decode_responses=True)

async def get_user(user:list) -> str:
    user_data = await r.get(user)
    return user_data
async def get_products(product:list)-> list:
    product_data = await r.mget(product)
    return product_data

async def get_features(inputs: dict) -> list:
    
    st = curt_giver()
    user_data = await get_user(inputs['userId'])
    online_user_data = [json.loads(json.loads(user_data))]
    end = curt_giver()
    print("Time to get user features: ", milsec_calc(st,end))
    
    st = curt_giver()
    product_data = await get_products(inputs['productIds'])
    online_product_data = []
    for i in product_data:
        online_product_data.append(json.loads(json.loads(i)))
    end = curt_giver()
    print("Time to get product features: ", milsec_calc(st,end))

    user_outputs = np.asarray(online_user_data,dtype=object)
    product_outputs = np.asarray(online_product_data,dtype=object)
    output = np.concatenate([np.concatenate([user_outputs]*product_outputs.shape[0])
    ,product_outputs],axis = 1)
    return output.tolist()

curt_giver() is time in milliseconds. The code from main file is:

    from fastapi import FastAPI
    from v1.redis_conn.get_features import get_features
    
    from model_scoring.score_onnx import score_features
    from v1.post_processing.sort_results import sort_results
    
    from v1.api_models.input_models import Ranking_Input
    from v1.api_models.output_models import Ranking_Output
    from v1.Helpers.helper import curt_giver, milsec_calc
    import numpy as np
    
    
    app = FastAPI()
    
    # Sending user and product ids through body, 
    # Hence a POST request is well suited for this, GET has unexpected behaviour
    @app.post("/predict", response_model = Ranking_Output)
    async def rank_products(inp_req: Ranking_Input):
      beg = curt_giver()
      reqids = inp_req.dict()
      st = curt_giver()
      features = await get_features(reqids)
      end = curt_giver()
    
      print("Total Redis duration ( user + products fetch): ", milsec_calc(st,end))
    
      data = np.asarray(features,dtype=np.float32,order=None)
      
      st = curt_giver()
      scores = score_features(data)
      end = curt_giver()
    
      print("ONNX model duration: ", milsec_calc(st,end))
    
      Ranking_results = sort_results(scores, list(reqids["productIds"]))
      end = curt_giver()
      print("Total time for API: ",milsec_calc(beg,end))
      resp_json = {"requestId": inp_req.requestId,
      "ranking": Ranking_results,
      "zipCode": inp_req.zipCode}
    
      return resp_json    

Through the timings I can read that for one request its taking very less time but for concurrent user the time for getting product data is keep on increasing linearly. Time to fetch one request all values are in milliseconds:

Time to get user features:  1
Time to get product features:  47
Total Redis duration ( user + products fetch):  53
ONNX model duration:  2
Total time for API:  60

Time to fetch for more than 10 concurrent request:

Time to get user features:  151
Time to get user features:  150
Time to get user features:  151
Time to get user features:  52
Time to get user features:  51
Time to get product features:  187
Total Redis duration ( user + products fetch):  433
ONNX model duration:  2
Total time for API:  440
INFO:     127.0.0.1:60646 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  239
Total Redis duration ( user + products fetch):  488
ONNX model duration:  2
Total time for API:  495
INFO:     127.0.0.1:60644 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  142
Total Redis duration ( user + products fetch):  297
ONNX model duration:  2
Total time for API:  303
INFO:     127.0.0.1:60648 - "POST /predict HTTP/1.0" 200 OK
Time to get product features:  188
Total Redis duration ( user + products fetch):  342
ONNX model duration:  2
Total time for API:  348

Its keep on increasing for more, hitting even 900ms+ to fetch both data from redis, Is there any way I can efficiently fetch concurrent data with low latency and increasing concurrent request like 500 and doesn't effect the latency, my target is under 300ms for 300 request concurrently every second.

I am stuck at this point any help, I will be very grateful.


Solution

  • It seems that some of your code is blocking. Looking at your log, it starts off asynchronous (not concurrent, that is not happening here). But then it handles all calls one by one.

    Looking at your code, it never yields control back to the event loop after the line product_data = await get_products(inputs['productIds']).

    If the code after that takes a long time, all other requests are waiting to be executed (and will be executed serially). We are missing some code (this is not a MRE) so it's hard to say what exactly is happening. For example, we don't know where in your code the logs Total Redis duration ( user + products fetch): and ONNX model duration: are generated, and you are using variables that have never been initiated (such as online_product_data).

    Bottomline is; if you want higher concurrency in FastAPI, you need more processes running your code. That means either more Uvicorn workers, or some load balancer and just more instances of Uvicorn (assuming you are using Uvicorn). Otherwise, try to hunt down any blocking IO that might be made non-blocking. However, I am guessing the majority of your blocking code is CPU intensive (not IO intensive), so upping the amount of Python processes handling your requests would be your best move.