Search code examples
pythonpython-asynciopython-decoratorsrate-limitingccxt

Applying conditional rate limiting decorator for async class method


I am using ccxt library for downloading OHLC data from various crypto exchanges. This is done internally using REST requests. These exchanges have different request rate limits, so I have to apply rate limiting to my methods. For rate limiting I use the library limiter. Here is what I have at the moment:

import ccxt.async_support as ccxt
from limiter import Limiter

limiter_binance = Limiter(rate=19, capacity=19, consume=1)
limiter_coinbase = Limiter(rate=3, capacity=3, consume=1)

class CCXTFeed:

    def __init__(self, exchange_name):
        self._exchange = ccxt.binance()
        self._exchange_name = exchange_name

    @limiter_binance 
    async def __FetchSymbolOHLCChunk(self, symbol, timeframe, startTime, limit=1000):
        data = await self._exchange.fetch_ohlcv(symbol, timeframe, since=startTime, limit=limit)
        return data

What I now want is to somehow apply limiter_coinbase if the class was instantiated with exchange_name="coinbase". I want to be able to choose applied rate limiter decorator based on the exchange I am currently working with. The exchange name is defined at runtime.


Solution

  • What you could do is create a custom decorator that then will redirect you to the correct decorator, for instance:

    
    import ccxt.async_support as ccxt
    from limiter import Limiter
    
    limiter_binance = Limiter(rate=19, capacity=19, consume=1)
    limiter_coinbase = Limiter(rate=3, capacity=3, consume=1)
    
    def dispatch_limiter(func):
        async def mod_func(self, *args, **kwargs):
            if self._exchange_name == "coinbase":
                return await (limiter_coinbase(func)(self, *args, **kwargs)) # basically, because limiter_coinbase is intended to be a decorator, we first instantiate it with the function and then we call it as if it was the original function.
            elif self._exchange_name == "binance":
                return await (limiter_binance(func)(self, *args, **kwargs))
       return mod_func
    
    class CCXTFeed:
    
        def __init__(self, exchange_name):
            self._exchange = ccxt.binance()
            self._exchange_name = exchange_name
    
        @dispatch_limiter
        async def __FetchSymbolOHLCChunk(self, symbol, timeframe, startTime, limit=1000):
            data = await self._exchange.fetch_ohlcv(symbol, timeframe, since=startTime, limit=limit)
            return data