I've implemented a Server Sent Event API in my Django app to stream realtime updates from my backend to the browser. The backend is a Redis pubsub. My Django view looks like this:
def event_stream(request):
"""
Stream worker events out to browser.
"""
listener = events.Listener(
settings.EVENTS_PUBSUB_URL,
channels=[settings.EVENTS_PUBSUB_CHANNEL],
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
)
return http.HttpResponse(listener, mimetype='text/event-stream')
And the events.Listener class that I'm returning as an iterator looks like this:
class Listener(object):
def __init__(self, rcon_or_url, channels, buffer_key=None,
last_event_id=None):
if isinstance(rcon_or_url, redis.StrictRedis):
self.rcon = rcon_or_url
elif isinstance(rcon_or_url, basestring):
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
self.channels = channels
self.buffer_key = buffer_key
self.last_event_id = last_event_id
self.pubsub = self.rcon.pubsub()
self.pubsub.subscribe(channels)
def __iter__(self):
# If we've been initted with a buffer key, then get all the events off
# that and spew them out before blocking on the pubsub.
if self.buffer_key:
buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)
# check whether msg with last_event_id is still in buffer. If so,
# trim buffered_events to have only newer messages.
if self.last_event_id:
# Note that we're looping through most recent messages first,
# here
counter = 0
for msg in buffered_events:
if (json.loads(msg)['id'] == self.last_event_id):
break
counter += 1
buffered_events = buffered_events[:counter]
for msg in reversed(list(buffered_events)):
# Stream out oldest messages first
yield to_sse({'data': msg})
try:
for msg in self.pubsub.listen():
if msg['type'] == 'message':
yield to_sse(msg)
finally:
logging.info('Closing pubsub')
self.pubsub.close()
self.rcon.connection_pool.disconnect()
I'm able to successfully stream events out to the browser with this setup. However, it seems that the disconnect calls in the listener's "finally" don't ever actually get called. I assume that they're still camped out waiting for messages to come from the pubsub. As clients disconnect and reconnect, I can see the number of connections to my Redis instance climbing and never going down. Once it gets to around 1000, Redis starts freaking out and consuming all the available CPU.
I would like to be able to detect when the client is no longer listening and close the Redis connection(s) at that time.
Things I've tried or thought about:
Final wrinkle: In production I'm using Gevent so I can get away with keeping a lot of connections open at once. However, this connection leak issue occurs whether I'm using plain old 'manage.py runserver', or Gevent monkeypatched runserver, or Gunicorn's gevent workers.
UPDATE: As of Django 1.5, you'll need to return a StreamingHttpResponse instance if you want to lazily stream things out as I'm doing in this question/answer.
ORIGINAL ANSWER BELOW
After a lot of banging on things and reading framework code, I've found what I think is the right answer to this question.
Here's the new code. First the Django view:
def event_stream(request):
"""
Stream worker events out to browser.
"""
return events.SSEResponse(
settings.EVENTS_PUBSUB_URL,
channels=[settings.EVENTS_PUBSUB_CHANNEL],
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=request.META.get('HTTP_LAST_EVENT_ID')
)
And the Listener class that does the work, along with a helper function to format the SSEs and an HTTPResponse subclass that lets the view be a little cleaner:
class Listener(object):
def __init__(self,
rcon_or_url=settings.EVENTS_PUBSUB_URL,
channels=None,
buffer_key=settings.EVENTS_BUFFER_KEY,
last_event_id=None):
if isinstance(rcon_or_url, redis.StrictRedis):
self.rcon = rcon_or_url
elif isinstance(rcon_or_url, basestring):
self.rcon = redis.StrictRedis(**utils.parse_redis_url(rcon_or_url))
if channels is None:
channels = [settings.EVENTS_PUBSUB_CHANNEL]
self.channels = channels
self.buffer_key = buffer_key
self.last_event_id = last_event_id
self.pubsub = self.rcon.pubsub()
self.pubsub.subscribe(channels)
# Send a superfluous message down the pubsub to flush out stale
# connections.
for channel in self.channels:
# Use buffer_key=None since these pings never need to be remembered
# and replayed.
sender = Sender(self.rcon, channel, None)
sender.publish('_flush', tags=['hidden'])
def __iter__(self):
# If we've been initted with a buffer key, then get all the events off
# that and spew them out before blocking on the pubsub.
if self.buffer_key:
buffered_events = self.rcon.lrange(self.buffer_key, 0, -1)
# check whether msg with last_event_id is still in buffer. If so,
# trim buffered_events to have only newer messages.
if self.last_event_id:
# Note that we're looping through most recent messages first,
# here
counter = 0
for msg in buffered_events:
if (json.loads(msg)['id'] == self.last_event_id):
break
counter += 1
buffered_events = buffered_events[:counter]
for msg in reversed(list(buffered_events)):
# Stream out oldest messages first
yield to_sse({'data': msg})
for msg in self.pubsub.listen():
if msg['type'] == 'message':
yield to_sse(msg)
def close(self):
self.pubsub.close()
self.rcon.connection_pool.disconnect()
class SSEResponse(HttpResponse):
def __init__(self, rcon_or_url, channels, buffer_key=None,
last_event_id=None, *args, **kwargs):
self.listener = Listener(rcon_or_url, channels, buffer_key,
last_event_id)
super(SSEResponse, self).__init__(self.listener,
mimetype='text/event-stream',
*args, **kwargs)
def close(self):
"""
This will be called by the WSGI server at the end of the request, even
if the client disconnects midstream. Unless you're using Django's
runserver, in which case you should expect to see Redis connections
build up until http://bugs.python.org/issue16220 is fixed.
"""
self.listener.close()
def to_sse(msg):
"""
Given a Redis pubsub message that was published by a Sender (ie, has a JSON
body with time, message, title, tags, and id), return a properly-formatted
SSE string.
"""
data = json.loads(msg['data'])
# According to the SSE spec, lines beginning with a colon should be
# ignored. We can use that as a way to force zombie listeners to try
# pushing something down the socket and clean up their redis connections
# when they get an error.
# See http://dev.w3.org/html5/eventsource/#event-stream-interpretation
if data['message'] == '_flush':
return ":\n" # Administering colonic!
if 'id' in data:
out = "id: " + data['id'] + '\n'
else:
out = ''
if 'name' in data:
out += 'name: ' + data['name'] + '\n'
payload = json.dumps({
'time': data['time'],
'message': data['message'],
'tags': data['tags'],
'title': data['title'],
})
out += 'data: ' + payload + '\n\n'
return out