Search code examples
pythonasynchronouspython-asynciowebhooksalpaca

Why is my websocket coroutine not being called in the following code?


I am using alpaca-py, Alpaca's new python package, to create a basic tradebot. My goal is to have the bot make a trade (buy), get data back from Alpaca's webhook regarding whether the order was filled (and some other information), and then make another trade with the same shares (sell). Before attempting to intergrate the webhook, the bot was buying and selling fine. I cannot seem to get a coroutine up and running, however.

I have tried the following:

  1. Move await statements to different areas in the coroutines.
  2. Change the placement of the method and removed async from various methods.
  3. Check Alpaca's documentation. (Unfortunately, alpaca-py launched in 2023 and a lot of their documentation is outdated)
  4. Read the TradingStream code to ensure I am doing everything correctly. All looks good.
  5. Change the asyncio.gather call and run them both as routines. I get the same result.
  6. Add logger statements to the code. This clued me in that my method 'trade_update_handler' isn't being called, as nothing gets printed to the console.
  7. Used 'run()' instead of '_run_forever()' however this causes an error on the webhook side.

I am using Django to run the bot, as I like it's BaseCommand class. I don't think django has anything to do with the issue. Here is my code:

class TradeMaker():
    def __init__(self, **kwargs):
        self.paper_bool = kwargs.get('paper_bool', True)
        self.random_bool = kwargs.get('random', True)
        self.symbol_or_symbols = kwargs.get('symbol_or_symbols', 'AAPL')
        self.amount = kwargs.get('amount', 40000)
        self.seconds_between = kwargs.get('seconds_between', 4)
        self.log = kwargs.get('log')
        self.trading_client, self.trading_stream, self.account = self.open_client()
        self.trade_update_info = None
        self.order_filled = False
        self.shares_bought = 0
        self.current_symbol = None
    
    def open_client(self):
        trading_client = TradingClient(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        try:
            account = trading_client.get_account()
        except Exception as e:
            logger.error(f"Exception in login: {e}")
        return trading_client, trading_stream, account
    
    async def trade_update_handler(self, data):
        logger.info('Trade Update called')
        print("Trade Update:", data)
        if data.event == TradeEvent.FILL:
            if data.order.side == OrderSide.BUY:
                self.order_filled = True
                self.shares_bought = data.order.filled_qty
                self.current_symbol = data.order.symbol

    async def run_stream(self):
        logger.info('Subscribing to trade updates')
        self.trading_stream.subscribe_trade_updates(self.trade_update_handler)
        logger.info('Preparing stream')
        await self.trading_stream._run_forever()

    async def stop_stream(self):
        logger.info('Stopping stream')
        trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        await trading_stream.stop()

    def get_symbol(self):
        if self.random_bool:
            symbol = random.choice(self.symbol_or_symbols)
            return symbol
        else:
            symbol = self.symbol_or_symbols
            return symbol

    def buy(self):
        symbol = self.get_symbol()
        market_order_data = MarketOrderRequest(
            symbol=symbol,
            qty=1,
            side=OrderSide.BUY,
            time_in_force=TimeInForce.DAY
        )
        try:
            market_order_buy = self.trading_client.submit_order(
                    order_data=market_order_data
            )
        except Exception as e:
            logger.error(f"Failed to buy {symbol}: {e}")
            return None
        return symbol, market_order_buy

    def sell(self, symbol):
        symbol = symbol
        shares = self.shares_bought
        
        market_order_data = MarketOrderRequest(
            symbol=symbol,
            qty=250,
            side=OrderSide.SELL,
            time_in_force=TimeInForce.DAY
        )
        try:
            market_order_sell = self.trading_client.submit_order(
                    order_data=market_order_data
            )
        except Exception as e:
            logger.error(f"Failed to sell {symbol}: {e}")
            return None
        return market_order_sell

    async def make_trades(self):
        market_close = datetime.datetime.now().replace(hour=14, minute=0, second=0, microsecond=0)
        while datetime.datetime.now() < market_close:
            seconds = self.seconds_between
            try:
                symbol, market_order_buy = self.buy()
                print(f"Bought {symbol}: {market_order_buy}")
            except Exception as e:
                logger.error(f"Failed to buy during trade: {e}")
                return None
            while not self.order_filled:
                logger.info('Waiting for order status update')
                await asyncio.sleep(1)
            sleep(seconds)
            try:
                market_order_sell = self.sell(symbol=symbol)
                print(f"Sold {self.current_symbol}: {market_order_sell}")
            except Exception as e:
                logger.error(f"Failed to sell during trade: {e}")
                return None
            self.order_filled = False
            self.shares_bought = 0
            sleep(seconds)
        print('Market closed, shutting down.')

class Command(BaseCommand):
    help = """This bot trades the target stock. If you want it to choose randomly, pass it a list and set the variable random=True
    """
    model = None

    def add_arguments(self, parser):
        parser.add_argument(
            '--paper',
            type=bool,
            help='Set false to live trade.',
            default=True
        )
        parser.add_argument(
            '--folder',
            type=str,
            help='source folder for files',
            default=''
        )
        parser.add_argument(
            '--symbol',
            type=str,
            help='target symbol, or list of symbols',
            default='AAPL'
        )
        parser.add_argument(
            '--random',
            type=bool,
            help="Set to true if passing a list of symbols to choose randomly from.",
            default=False
        )
        parser.add_argument(
            '--tradevalue',
            type=int,
            help="The amount the bot should trade. e.g. $40000",
            default=40000
        )
        parser.add_argument(
            '--seconds',
            type=int,
            help="The number of seconds the bot should wait between each trade.",
            default=4
        )

    def handle(self, **options):
        paper_bool = options['paper']
        random_bool = options['random']
        symbol_or_symbols = options['symbol']
        amount = options['tradevalue']
        seconds_between = options['seconds']
        log = options['folder']
        tm = TradeMaker(
            paper_bool = paper_bool,
            random = random_bool,
            symbol_or_symbols = symbol_or_symbols,
            amount = amount,
            seconds_between = seconds_between,
            log = log
        )
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(asyncio.gather(
                tm.run_stream(),
                tm.make_trades()
            ))
        except KeyboardInterrupt:
            tm.stop_stream()
            print("Stopped with Interrupt")
        finally:
            tm.stop_stream()
            loop.close()

When I run the command, I get the following output in my terminal (information censored for security):

    python manage.py trade_maker_v5
2023-03-30 11:51:48,342 - INFO - Subscribing to trade updates
2023-03-30 11:51:48,342 - INFO - Preparing stream
Bought AAPL: id=UUID('foo') client_order_id='bar' created_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995853, tzinfo=datetime.timezone.utc) updated_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995921, tzinfo=datetime.timezone.utc) submitted_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 994623, tzinfo=datetime.timezone.utc) filled_at=None expired_at=None canceled_at=None failed_at=None replaced_at=None replaced_by=None replaces=None asset_id=UUID('foo') symbol='AAPL' asset_class=<AssetClass.US_EQUITY: 'us_equity'> notional=None qty='1' filled_qty='0' filled_avg_price=None order_class=<OrderClass.SIMPLE: 'simple'> order_type=<OrderType.MARKET: 'market'> type=<OrderType.MARKET: 'market'> side=<OrderSide.BUY: 'buy'> time_in_force=<TimeInForce.DAY: 'day'> limit_price=None stop_price=None status=<OrderStatus.PENDING_NEW: 'pending_new'> extended_hours=False legs=None trail_percent=None trail_price=None hwm=None
2023-03-30 11:51:48,480 - INFO - Waiting for order status update
2023-03-30 11:51:49,493 - INFO - Waiting for order status update
2023-03-30 11:51:50,500 - INFO - Waiting for order status update

If I run the webhook seprately in another terminal while my bot runs, it works. I can run the following code:

    from alpaca.trading.stream import TradingStream

trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=True)

async def update_handler(data):
    print(data)

trading_stream.subscribe_trade_updates(update_handler)
trading_stream.run()

It will print out all the data as my bot runs. Why would it work seperately, but not in a coroutine?


Solution

  • Django was causing the issue. After removing the bot from Django and making it a standalone bot and adding a few more async statements, it now works.

    Django recently added async in 4.2, but this project was running Django 4.1