Search code examples
pythonbinancebybitpython-bybitpybit

Any way to use proxy in pybit?


I want to use proxy server before sending request directly to py-bit. I am using pybit.unified_trading --> HTTP and it doesn’t come with any such params to connect with proxy.
This is my get_bybit_balance function for by-bit -

from pybit.unified_trading import HTTP

def get_bybit_balance(user_config):
    proxies = {
        'http': f'http://{user_config["proxy_host"]}:{user_config["proxy_port"]}',
        'https': f'http://{user_config["proxy_host"]}:{user_config["proxy_port"]}'
    }
    
    try:
        session = requests.Session()
        session.proxies.update(proxies)
        
        client = HTTP(
            testnet=False,
            api_key=user_config["api_key"],
            api_secret=user_config["secret_key"]
        )
        print(type(client))
        # returns --> <class 'pybit.unified_trading.HTTP'>
        
        spot_balance = client.get_wallet_balance(accountType="UNIFIED")
        
        return {
            "UNIFIED": unified_balance
        }
    except Exception as e:
        return {"error": str(e)}

I did check the _http_manager.py which had the following available -

class _V5HTTPManager:
    testnet: bool = field(default=False)
    domain: str = field(default=DOMAIN_MAIN)
    tld: str = field(default=TLD_MAIN)
    demo: bool = field(default=False)
    rsa_authentication: str = field(default=False)
    api_key: str = field(default=None)
    api_secret: str = field(default=None)
    logging_level: logging = field(default=logging.INFO)
    log_requests: bool = field(default=False)
    timeout: int = field(default=10)
    recv_window: bool = field(default=5000)
    force_retry: bool = field(default=False)
    retry_codes: defaultdict[dict] = field(
        default_factory=dict,
        init=False,
    )
    ignore_codes: dict = field(
        default_factory=dict,
        init=False,
    )
    max_retries: bool = field(default=3)
    retry_delay: bool = field(default=3)
    referral_id: bool = field(default=None)
    record_request_time: bool = field(default=False)
    return_response_headers: bool = field(default=False)

Is there any way to do so? I am doing similar thing for Binance using requests_params in the client like this -

        client = Client(
            user_config["api_key"], 
            user_config["secret_key"], 
            testnet=False,
            requests_params={'proxies': proxies}
        )

Solution

  • Create a client like -

    from pybit.unified_trading import HTTP as BybitHTTP
    
    class ProxyHTTP(BybitHTTP):
        def __init__(self, proxy_host=None, proxy_port=None, **kwargs):
            super().__init__(**kwargs)
            self.proxy_url = f'https://{proxy_host}:{proxy_port}' if proxy_host and proxy_port else None
            
        def _send_http_request(self, req, **kwargs):
            if self.proxy_url:
                settings = self.client.merge_environment_settings(
                    req.url,
                    proxies={'https': self.proxy_url, 'https': self.proxy_url},
                    stream=None,
                    verify=True,
                    cert=None
                )
                return self.client.send(req, timeout=self.timeout, allow_redirects=False, **settings)
            return self.client.send(req, timeout=self.timeout)
    

    Use this in your main file -

    from bybit_client import ProxyHTTP