Search code examples
pythonherokutwitterpsycopg2tweepy

passing psycopg2 cursor to tweepy on_status() method


I'm trying to pass a psycopg2 cursor to a tweepy stream.

The connection pool and cursors are configured in a seperate file. Only the cursor is passed as an argument to the main pipeline function in another file called get_tweet_topic.py. I need the cursor in the on_status() method because I have a query there that needs it for execution.

I cannot figure out how to pass the cursor onto the on_status() method in the MyStreamListener() class.

the error I get is:

2020-03-05T22:16:24.856945+00:00 app[worker.1]: self._target(*self._args, **self._kwargs)
2020-03-05T22:16:24.856945+00:00 app[worker.1]: File "/app/get_tweet_topic.py", line 81, in guess_topic_pipeline
2020-03-05T22:16:24.856946+00:00 app[worker.1]: status_streams.streaming_pipeline(api, cursor)
2020-03-05T22:16:24.856947+00:00 app[worker.1]: File "/app/status_streams.py", line 100, in streaming_pipeline
2020-03-05T22:16:24.856947+00:00 app[worker.1]: general_stream(api, cursor)
2020-03-05T22:16:24.856948+00:00 app[worker.1]: File "/app/status_streams.py", line 86, in general_stream
2020-03-05T22:16:24.856948+00:00 app[worker.1]: myStreamListener = MyStreamListener()
2020-03-05T22:16:24.856948+00:00 app[worker.1]: TypeError: __init__() missing 1 required positional argument: 'cursor'

Code:

status_streams.py:

import tweepy
import os

import db_queries
import follow

#define class for the stream listener
class MyStreamListener(tweepy.StreamListener):

    def __init__(self, cursor):
        super().__init__()
        self.cursor = cursor
        #set counter to only get 1200 tweets
        self.counter = 0
        self.max = 1200

    #get tweets
    def on_status(self, status):
        if not status.retweeted:
            status_dict = {'created_at': status.created_at.strftime('%y-%m-&d %H:%M'),
                    'source_stream': 'general stream',
                    'status_id': status.id_str,
                    'user_id': status.user.id_str,
                    'screen_name': status.user.name,
                    'tweet_text': status.text,
                    'num_likes': status.favorite_count,
                    'num_retweets': status.retweet_count}

            created_at = status_dict['created_at']
            source_stream = status_dict['source_stream']
            status_id = status_dict['status_id']
            user_id = status_dict['user_id']
            screen_name = status_dict['screen_name']
            tweet_text = status_dict['tweet_text']
            num_likes = status_dict['num_likes']
            num_retweets = status_dict['num_retweets']

            db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)

        self.counter +=1
        if self.counter == self.max:
            return False


#get tweets from list of followers
def following_stream(api, cursor, user_name):
    try:
        for status in tweepy.Cursor(api.user_timeline, tweet_mode='extended', include_rts=False, screen_name=user_name).items(1):
            #ignore retweets
            if not status.retweeted:
                status_dict = {'created_at': status.created_at.strftime('%y-%m-%d %H:%M'),
                               'source_stream': 'following stream',
                               'status_id': status.id_str,
                               'user_id': status.user.id_str,
                               'screen_name': status.user.name,
                               'tweet_text':status.full_text,
                               'num_likes':status.favorite_count,
                               'num_retweets':status.retweet_count}

                created_at = status_dict['created_at']
                source_stream = status_dict['source_stream']
                status_id = status_dict['status_id']
                user_id = status_dict['user_id']
                screen_name = status_dict['screen_name']
                tweet_text = status_dict['tweet_text']
                num_likes = status_dict['num_likes']
                num_retweets = status_dict['num_retweets']

                db_queries.insert_raw_tweets_table(cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)


#function that controls both streams
def streaming_pipeline(api, cursor):
    #get list of all users that are currently followed
    #iterate through the following_list and grab the single latest tweet
    following_list = follow.get_following(api)
    for user in following_list:
        f_stream = following_stream(api, cursor, user)

    #stream class is used here
    myStreamListener = MyStreamListener()
    stream = tweepy.Stream(auth=api.auth, listener=myStreamListener(cursor=self.cursor))
    stream.filter(languages=['en'], track=['the'])


    cursor.close()

relevant section of get_tweet_topic.py:

def guess_topic_pipeline(api, conn, model, corpus, classifier):

    while True:
        cursor = conn.cursor()
        db_queries.create_temp_tweets_table(cursor)
        conn.commit()

        #use pipeline to grab tweets off twitter
        print('Retrieving statuses from streams...')
        status_streams.streaming_pipeline(api, cursor)
        print('Done retrieving...')

relevant portion of connection pooling code:

        #get connection from pool, pass cursor as an argument, start topic extration thread
        topic_conn = conn_pool.getconn()
        topic_extraction_thread = Thread(target=get_tweet_topic.guess_topic_pipeline, kwargs={'api':api, 'conn': topic_conn, 'model': lda_model, 'corpus': lda_id2word, 'classifier': lda_huber_classifier})
        topic_extraction_thread.start()
        #return connection when done
        conn_pool.putconn(topic_conn)

insert_raw_tweets_table() function with actual query:

def insert_raw_tweets_table(cursor, createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets):
    cursor.execute('INSERT INTO tempTweets(createdAt, sourceStream, statusID, userID, screenName, tweetText) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)', (createdAt, sourceStream, statusID, userID, screenName, tweetText, numLikes, numRetweets))


Solution

  • @MauriceMeyer answered the question in the comments but here is the working code for clarity.

    I forgot to reference the cursor as self.cursor within the class and I forgot to pass the cursor as an argument when creating an instance of the class. I was passing the cursor as an argument after creating the instance, which is not correct.

    Correct code:

    class MyStreamListener(tweepy.StreamListener):
    
        def __init__(self, cursor):
            super().__init__()
            self.cursor = cursor
            #set counter to only get 1200 tweets
            self.counter = 0
            self.max = 1200
    
        #get tweets
        def on_status(self, status):
            if not status.retweeted:
                status_dict = {'created_at' : status.created_at.strftime('%y-%m-&d %H:%M'),
                               'source_stream' : 'general stream',
                               'status_id' : status.id_str,
                               'user_id' : status.user.id_str,
                               'screen_name' : status.user.name,
                               'tweet_text' : status.text,
                               'num_likes' : status.favorite_count,
                               'num_retweets' : status.retweet_count}
    
                created_at = status_dict['created_at']
                source_stream = status_dict['source_stream']
                status_id = status_dict['status_id']
                user_id = status_dict['user_id']
                screen_name = status_dict['screen_name']
                tweet_text = status_dict['tweet_text']
                num_likes = status_dict['num_likes']
                num_retweets = status_dict['num_retweets']
    
                                                       #▼ reference self.cursor here
                db_queries.insert_raw_tweets_table(self.cursor, created_at, source_stream, status_id, user_id, screen_name, tweet_text, num_likes, num_retweets)
    
            self.counter +=1
            if self.counter == self.max:
                return False
    
    
    
    
    #stream class is used here                ▼ reference cursor here
        myStreamListener = MyStreamListener(cursor)
                                                                     #▼ removed reference to cursor here
        stream = tweepy.Stream(auth=api.auth, listener=myStreamListener)
        stream.filter(languages=['en'], track=['the'])