my goal is be able to send the stream readed in twitter with tweepy and serving the content using websocket with gevent. I had solved the two parts of the problem in two simple scripts obtained in the official documentation.
1)In one side the I can read the Twiter stream using the tweepy:
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""
class myStreamListener(StreamListener):
def on_data(self, data):
decoded = json.loads(data)
if decoded["coordinates"] is not None:
print decoded["coordinates"]["coordinates"]
return True
def on_error(self, status):
print status
if __name__ == '__main__':
l = myStreamListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
geobox_world = [-180,-90,180,90]
stream.filter(locations=geobox_world)
This print me in the standard output the information required
2) On the other side I took a dump example of gevent in order to be able of make a 'thread' capable of serving the data to a web client
from geventwebsocket.handler import WebSocketHandler
from gevent import pywsgi
import gevent
import time
def app(environ, start_response):
ws = environ['wsgi.websocket']
contador = 0
while True:
strTemp = "hola" + str(contador)
ws.send( strTemp )
time.sleep(1)
contador = contador + 1
server = pywsgi.WSGIServer(('', 10000), app, handler_class=WebSocketHandler)
server.serve_forever()
The problem that I faced on is the join of two codes;I not be able of make run myStreamListener inside of a tweepy 'thread'
How i can do this? One approach maybe could be make a middle buffer but this is out off my pythons skills
The StreamListener runs on a gevent greenlet, and sends to all the websockets connected to the server the parsed coordinates.
Tested with iocat localhost:10000
import gevent
import gevent.monkey
gevent.monkey.patch_all()
from geventwebsocket.handler import WebSocketHandler
from gevent import pywsgi
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""
class MyStreamListener(StreamListener):
def __init__(self):
self.sockets = []
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
self.stream = Stream(auth, self)
def add_socket(self, ws):
self.sockets.append(ws)
def run(self):
try:
self.stream.filter(track="#linux")
except Exception:
self.stream.disconnect()
def start(self):
gevent.spawn(self.run)
def send(self, ws, coordinates):
try:
ws.send(json.dumps(coordinates))
except Exception:
# the web socket die..
self.sockets.remove(ws)
def on_data(self, data):
decoded = json.loads(data)
if decoded.get("coordinates", None) is not None:
coordinates = decoded["coordinates"]["coordinates"]
for ws in self.sockets:
gevent.spawn(self.send, ws, coordinates)
return True
def on_error(self, status):
print "Error", status
def on_timeout(self):
print "tweepy timeout.. wait 30 seconds"
gevent.sleep(30)
stream_listener = MyStreamListener()
stream_listener.start()
def app(environ, start_response):
ws = environ['wsgi.websocket']
stream_listener.add_socket(ws)
while not ws.closed:
gevent.sleep(0.1)
server = pywsgi.WSGIServer(('', 10000), app, handler_class=WebSocketHandler)
server.serve_forever()