Search code examples
twitterpython-asynciotelegramtweepypython-telegram-bot

How to get a user's new Tweets with a Telegram Python bot while run_polling?


I'm currently developing a Telegram bot using telegram-python-bot and tweepy.
I want to create a feature that allows users of the bot to add their Twitter ID list via Telegram and have their new Tweets sent to them in real-time.

I want that the bot should be application.run_polling() to receive commands from the user, and at the same time, forwarding new tweets from Twitter users in users individual list.

When I read the tweepy documentation, I realized that I can get real-time tweets with fewer api requests if I fetch them through MyStream(auth=auth, listener=None).
But I don't know how to get both functions to work on the same file at the same time.

version
nest_asyncio-1.5.6 python_telegram_bot-20.0 tweepy-4.12.1

def main() -> None:
    application = Application.builder().token("...").build()
    add_list = ConversationHandler(
        entry_points=[CallbackQueryHandler(input_id, pattern='input_id')],
        states={ADD :[MessageHandler(filters.TEXT & ~filters.COMMAND, add)],},
        fallbacks=[CallbackQueryHandler(button,pattern='back')])

    application.add_handler(CommandHandler("on", on))
    application.add_handler(add_list)
    application.add_handler(CommandHandler("start", start))
    application.add_handler(CommandHandler("list", list_setting))
    application.add_handler(CommandHandler("admin", admin))
    application.add_handler(CommandHandler("help", help_command))
    application.add_handler(CallbackQueryHandler(button))
    application.run_polling()

if __name__ == "__main__":
    main()

This is my main statement and I made it work until the SIGINT(ctrl+c) came in via application.run_polling().
I want to combine the above code to run and do the following at the same time.

import tweepy

consumer_key = "..." # Twitter API Key
consumer_secret = "..." # Twitter API Secret Key
access_token = "..." # Twitter Access Key
access_token_secret = "..." # Twitter Access Secret Key

usernames = ['...']
auth = tweepy.OAuth1UserHandler(
    consumer_key, consumer_secret, access_token, access_token_secret
)

# Convert screen names to user IDs
user_ids = []
for username in usernames:
    user = tweepy.API(auth).get_user(screen_name=username)
    user_ids.append(str(user.id))

# Create a custom stream class
class MyStream(tweepy.Stream):
    def __init__(self, auth, listener=None):
        super().__init__(consumer_key, consumer_secret, access_token, access_token_secret)

    def on_status(self, status):
        tweet_url = f"https://twitter.com/{status.user.screen_name}/status/{status.id_str}"
        print(f"{status.user.screen_name} tweeted: {status.text}\n{tweet_url}")
        # send message to telegram
     
# Create a stream object with the above class and authentication
myStream = MyStream(auth=auth, listener=None)

# Start streaming for the selected users
myStream.filter(follow=user_ids)

I also tried to use thread's interval function or python-telegram-bot's job_queue.run_repeating function,
but these seem problematic for forwarding messages in real time.
I'm desperately looking for someone to help me with this😢.

UPDATE
Referring to the second link in CallMeStag's comment, I changed the structure of the main statement to look like this.

async def main() -> None:
    async with application:
        # await application.run_polling()
        await application.start()
        await application.updater.start_polling()
        await myStream.filter(follow=user_ids)
        await myStream.disconnect()
        await application.updater.stop()
        await application.stop()

if __name__ == "__main__":
    application = Application.builder().token("...").build()
    add_list = ConversationHandler(
        entry_points=[CallbackQueryHandler(input_id, pattern='input_id')],
        states={ADD :[MessageHandler(filters.TEXT & ~filters.COMMAND, add)],},
        fallbacks=[CallbackQueryHandler(button,pattern='back')])
    application.add_handler(add_list)
    application.add_handler(CommandHandler("list", list_setting))
    application.add_handler(CommandHandler("admin", admin))
    application.add_handler(CallbackQueryHandler(button))
    application.add_handler(CommandHandler("help", help_command))
    application.add_handler(CommandHandler("start", start))
    asyncio.run(main())

When I ran the file, the command line in the terminal looked like this

2023-02-22 08:13:34,760 - apscheduler.scheduler - INFO - Scheduler started
2023-02-22 08:13:34,760 - telegram.ext._application - INFO - Application started
2023-02-22 08:13:35,610 - tweepy.streaming - INFO - Stream connected

So I thought the both asyncio frameworks would work fine, but the fetching content from Twitter worked fine, but commands like /help didn't work.
Do you know how you handled the framework asynchronously or a solution to the above problem?
If i split this into two files (one to receive the tweets, one to run the bot), is there any way to get live updated information from the bot?
I would appreciate any advice from anyone who has developed this kind of bot.


Solution

  • I fixed this by using tweepy.asynchronous.AsyncStream instead of tweepy.Stream.

    class MyStream(tweepy.asynchronous.AsyncStream):
        def __init__(self, auth, listener=None):
            super().__init__(consumer_key, consumer_secret, access_token, access_token_secret)
    
        async def on_status(self, status):
            tweet_url = f"https://twitter.com/{status.user.screen_name}/status/{status.id_str}"
            print(f"{status.user.screen_name} tweeted: {status.text}\n{tweet_url}")
            
    
    myStream = MyStream(auth=auth, listener=None)