I am using ccxt
library for downloading OHLC data from various crypto exchanges. This is done internally using REST requests. These exchanges have different request rate limits, so I have to apply rate limiting to my methods. For rate limiting I use the library limiter
. Here is what I have at the moment:
import ccxt.async_support as ccxt
from limiter import Limiter
limiter_binance = Limiter(rate=19, capacity=19, consume=1)
limiter_coinbase = Limiter(rate=3, capacity=3, consume=1)
class CCXTFeed:
def __init__(self, exchange_name):
self._exchange = ccxt.binance()
self._exchange_name = exchange_name
@limiter_binance
async def __FetchSymbolOHLCChunk(self, symbol, timeframe, startTime, limit=1000):
data = await self._exchange.fetch_ohlcv(symbol, timeframe, since=startTime, limit=limit)
return data
What I now want is to somehow apply limiter_coinbase
if the class was instantiated with exchange_name="coinbase"
. I want to be able to choose applied rate limiter decorator based on the exchange I am currently working with. The exchange name is defined at runtime.
What you could do is create a custom decorator that then will redirect you to the correct decorator, for instance:
import ccxt.async_support as ccxt
from limiter import Limiter
limiter_binance = Limiter(rate=19, capacity=19, consume=1)
limiter_coinbase = Limiter(rate=3, capacity=3, consume=1)
def dispatch_limiter(func):
async def mod_func(self, *args, **kwargs):
if self._exchange_name == "coinbase":
return await (limiter_coinbase(func)(self, *args, **kwargs)) # basically, because limiter_coinbase is intended to be a decorator, we first instantiate it with the function and then we call it as if it was the original function.
elif self._exchange_name == "binance":
return await (limiter_binance(func)(self, *args, **kwargs))
return mod_func
class CCXTFeed:
def __init__(self, exchange_name):
self._exchange = ccxt.binance()
self._exchange_name = exchange_name
@dispatch_limiter
async def __FetchSymbolOHLCChunk(self, symbol, timeframe, startTime, limit=1000):
data = await self._exchange.fetch_ohlcv(symbol, timeframe, since=startTime, limit=limit)
return data