Search code examples
pythonredistwistedserver-sent-events

Twisted SSE server subscribed to Redis via pubsub


I'm trying to build a server in Twisted which would let clients connect using Server Sent Events. I would like this server also to listen to Redis and if a message comes then push it to the connected SSE clients.

I have the SSE server working. I know how to subscribe to Redis. I can't figure out how to have both pieces running without blocking each other.

I'm aware of https://github.com/leporo/tornado-redis and https://github.com/fiorix/txredisapi, which were recommended in related questions. No idea how this helps :/

How to solve this? Could you help with both: conceptual tips and code snippets?

My Twisted SSE server code:

# coding: utf-8
from twisted.web import server, resource
from twisted.internet import reactor


class Subscribe(resource.Resource):
    isLeaf = True
    sse_conns = set()

    def render_GET(self, request):
        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
        request.write("")
        self.add_conn(request)
        return server.NOT_DONE_YET

    def add_conn(self, conn):
        self.sse_conns.add(conn)
        finished = conn.notifyFinish()
        finished.addBoth(self.rm_conn)

    def rm_conn(self, conn):
        self.sse_conns.remove(conn)

    def broadcast(self, event):
        for conn in self.sse_conns:
            event_line = "data: {}'\r\n'".format(event)
            conn.write(event_line + '\r\n')


if __name__ == "__main__":
    sub = Subscribe()
    reactor.listenTCP(9000, server.Site(sub))
    reactor.run()

My Redis subscribe code:

import redis


redis = redis.StrictRedis.from_url('redis://localhost:6379')


class RedisSub(object):
    def __init__(self):
        self.pubsub = redis.pubsub()
        self.pubsub.subscribe('foobar-channel')

    def listen(self):
        for item in self.pubsub.listen():
            print str(item)

Solution

  • This is what works for me.

    I've ended up using txredis lib with a slight change to the RedisClient (added minimal subscribe capabilities).

    # coding: utf-8
    import os
    import sys
    import weakref
    
    from txredis.client import RedisClient
    
    from twisted.web import server, resource
    from twisted.internet import reactor, protocol, defer
    from twisted.python import log
    
    from utils import cors, redis_conf_from_url
    
    
    log.startLogging(sys.stdout)
    
    PORT = int(os.environ.get('PORT', 9000))
    REDIS_CONF = redis_conf_from_url(os.environ.get('REDISCLOUD_URL', 'redis://localhost:6379'))
    REDIS_SUB_CHANNEL = 'votes'
    
    
    class RedisBroadcaster(RedisClient):
        def subscribe(self, *channels):
            self._send('SUBSCRIBE', *channels)
    
        def handleCompleteMultiBulkData(self, reply):
            if reply[0] == u"message":
                message = reply[1:][1]
                self.sse_connector.broadcast(message)
            else:
                super(RedisClient, self).handleCompleteMultiBulkData(reply)
    
    
    @defer.inlineCallbacks
    def redis_sub():
        clientCreator = protocol.ClientCreator(reactor, RedisBroadcaster, password=REDIS_CONF.get('password'))
        redis = yield clientCreator.connectTCP(REDIS_CONF['host'], REDIS_CONF['port'])
        redis.subscribe(REDIS_SUB_CHANNEL)
    
    
    class Subscribe(resource.Resource):
        isLeaf = True
        sse_conns = weakref.WeakSet()
    
        @cors
        def render_GET(self, request):
            request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
            request.write("")
            self.sse_conns.add(request)
            return server.NOT_DONE_YET
    
        def broadcast(self, event):
            for conn in self.sse_conns:
                event_line = "data: {}\r\n".format(event)
                conn.write(event_line + '\r\n')
    
    
    if __name__ == "__main__":
        sub = Subscribe()
        reactor.listenTCP(PORT, server.Site(sub))
    
        RedisBroadcaster.sse_connector = sub    
        reactor.callLater(0, redis_sub)
    
        reactor.run()