Search code examples
pythonjsonpostgresqltweepytwitter-streaming-api

How to parse multiple json object comes from tweepy on_data method stream to postgresql in python?


I'm trying to parse data from multiple JSON objects returned from tweepy's on_data method (each line of the data stream contains one JSON object, each data stream contains one to several lines) and send it to PostgreSQL DB, but it always fails to insert record into table with TypeError('list indices must be integers, not str) message.

I'm not sure but I think the problem is located in the code where I try to data.splitlines() and read/process each line into the PostgreSQL. However, when I tried to print the data to the console, it always returns several lines of data and not be able to process it one by one.

I'm setting up a python script to collect twitter stream data using tweepy and try to store the data to apache spark and PostgreSQL DB so that I can load it into Grafana.

I also tried several solutions from here, here, and here. However, it seems that the problem is still there.

def on_data(self, data):
        (width, height) = getTerminalSize()
        try:
            # here data comes in a multiple json object each in a line; but when i'm trying to read it line by line using splitlines() and for loop for each line it still return multiple lines so that i can't isert into the postgresql table
            lines = data.splitlines()
            for line in lines:
                d = json.loads(line)                  
                # processing the data
                if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
                    teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
                    print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
                    print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
                    print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
                    # send data to socket for processing in spark
                    self.client_socket.send(data)
                    # insert record to table
                    try:
                        conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                        cursor = conn.cursor()
                        # insert ops
                        item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
                        sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                        cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
                        conn.commit()
                        count = cursor.rowcount
                        print (count, " Record inserted successfully into table")
                    except (Exception, psycopg2.Error) as error:
                        if(conn):
                            print("Failed to insert record into table", error)
                    finally:
                        #closing database connection.
                        if(conn):
                            cursor.close()
                            conn.close()
                            print(" [*] PostgreSQL connection is closed")
                    print console_colors.GREY+ "_" * width + console_colors.ENDC

                elif 'extended_tweet' not in d and 'RT @' not in d['text'].encode('utf-8'):
                    teks_twit = str(d['text'].encode('utf-8'))
                    print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
                    print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
                    print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
                    # send data to socket for processing in spark
                    self.client_socket.send(data)
                    # insert record to table
                    try:
                        conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                        cursor = conn.cursor()
                        # insert ops
                        item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
                        sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                        cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
                        conn.commit()
                        count = cursor.rowcount
                        print (count, " Record inserted successfully into table")
                    except (Exception, psycopg2.Error) as error :
                        if(conn):
                            print("Failed to insert record into table", error)
                    finally:
                        #closing database connection.
                        if(conn):
                            cursor.close()
                            conn.close()
                            print(" [*] PostgreSQL connection is closed")
                    print console_colors.GREY+ "_" * width + console_colors.ENDC

                else:
                    pass

        except BaseException as e:
            print("Error on_data: %s" % str(e))

Example of expected result in each turn of for loop:

d['user']['screen_name']
fooUser

only one result for each turn.

Actual result:

d['user']['screen_name']
fooUser
userNumber2
anotherUser

more than one result for each turn in for loop.

Update:

I update the code and it seems that the result of d['user']['screen_name'] as well as other key's successfully return single item, right before it is inserted into the PostgreSQL table.

However, the TypeError('list indices must be integers, not str) message is still there. Still trying another option.

These are the changes:

...
try:
            # split each line of data
            for item in data.splitlines():
                # loads the data into json object
                d = json.loads(item)  
                # processing the data
                if d["user"]["lang"] == "id":
                    if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
                        teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
                        print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
...

and

...
try:
                            conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                            cursor = conn.cursor()
                            # insert ops
                            # check if the data is already separated
                            print d['user']['screen_name']
                            # insert data into table
                            sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                            cursor.execute(sqlquery, (str(d['user']['screen_name'].encode('utf-8')), str(d['extended_tweet']['full_text'].encode('utf-8')), str(d['entities']['hashtags']['text']), len(str(d['extended_tweet']['full_text'].encode('utf-8')).split())))

                            conn.commit()
                            count = cursor.rowcount
                            print (count, " Record inserted successfully into table")
                        except (Exception, psycopg2.Error) as error:
                            if(conn):
                                print("Failed to insert record into table", error)
                        finally:
                            #closing database connection.
                            if(conn):
                                cursor.close()
                                conn.close()
                                print(" [*] PostgreSQL connection is closed")
...

Solution

  • Finally after reading many sources and doing trial and error, and removing some field in the table which has list in it, I tried to simply modify the code into this:

    # split each line of data
         objects = data.splitlines()
         for line in objects:
            # loads the data into json object
            d = json.loads(line)
            # and then process the data...........
    

    and it finally be able to insert the data into the PostgreSQL DB. :)