Search code examples
pythontwittergeventtweepy

Consuming Twitter stream with tweepy and serving content via websocket with gevent


my goal is be able to send the stream readed in twitter with tweepy and serving the content using websocket with gevent. I had solved the two parts of the problem in two simple scripts obtained in the official documentation.

1)In one side the I can read the Twiter stream using the tweepy:

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json

access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""

class myStreamListener(StreamListener):

    def on_data(self, data):
        decoded = json.loads(data)
        if decoded["coordinates"] is not None:
            print decoded["coordinates"]["coordinates"]
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':

    l = myStreamListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)

    geobox_world = [-180,-90,180,90]
    stream.filter(locations=geobox_world)

This print me in the standard output the information required

2) On the other side I took a dump example of gevent in order to be able of make a 'thread' capable of serving the data to a web client

from geventwebsocket.handler import WebSocketHandler
from gevent import pywsgi
import gevent
import time

def app(environ, start_response):
    ws = environ['wsgi.websocket']
    contador = 0
    while True:
        strTemp = "hola" + str(contador) 
        ws.send( strTemp )
        time.sleep(1)
        contador = contador + 1

server = pywsgi.WSGIServer(('', 10000), app, handler_class=WebSocketHandler)
server.serve_forever()

The problem that I faced on is the join of two codes;I not be able of make run myStreamListener inside of a tweepy 'thread'

How i can do this? One approach maybe could be make a middle buffer but this is out off my pythons skills


Solution

  • The StreamListener runs on a gevent greenlet, and sends to all the websockets connected to the server the parsed coordinates.

    Tested with iocat localhost:10000

    import gevent
    import gevent.monkey
    gevent.monkey.patch_all()
    
    from geventwebsocket.handler import WebSocketHandler
    from gevent import pywsgi
    
    from tweepy.streaming import StreamListener
    from tweepy import OAuthHandler
    from tweepy import Stream
    import json
    
    access_token = ""
    access_token_secret = ""
    consumer_key = ""
    consumer_secret = ""
    
    
    class MyStreamListener(StreamListener):
        def __init__(self):
            self.sockets = []
            auth = OAuthHandler(consumer_key, consumer_secret)
            auth.set_access_token(access_token, access_token_secret)
            self.stream = Stream(auth, self)
    
        def add_socket(self, ws):
            self.sockets.append(ws)
    
        def run(self):
            try:
                self.stream.filter(track="#linux")
            except Exception:
                self.stream.disconnect()
    
        def start(self):
            gevent.spawn(self.run)
    
        def send(self, ws, coordinates):
            try:
                ws.send(json.dumps(coordinates))
            except Exception:
                # the web socket die..
                self.sockets.remove(ws)
    
        def on_data(self, data):
            decoded = json.loads(data)
            if decoded.get("coordinates", None) is not None:
                coordinates = decoded["coordinates"]["coordinates"]
                for ws in self.sockets:
                    gevent.spawn(self.send, ws, coordinates)
            return True
    
        def on_error(self, status):
            print "Error", status
    
        def on_timeout(self):
            print "tweepy timeout.. wait 30 seconds"
            gevent.sleep(30)
    
    stream_listener = MyStreamListener()
    stream_listener.start()
    
    
    def app(environ, start_response):
        ws = environ['wsgi.websocket']
        stream_listener.add_socket(ws)
        while not ws.closed:
            gevent.sleep(0.1)
    
    server = pywsgi.WSGIServer(('', 10000), app, handler_class=WebSocketHandler)
    server.serve_forever()