Search code examples
pythonrestmicroservicesevent-bus

Building Microservices Event Bus and REST api (python / flask)


Background

I'm building my first application using a microservice architecture. I'll be working mostly in Python using Flask.

I'm considering implementing an event/message bus to coordinate actions between services. A few services that I intend to implement are: Auth, Users, Posts, and Chat. The application has two entities ('User', and 'Group') that are used by almost every service. I have a separate database for each service, and each database has it's own users and groups tables to manage the user/group data specific to that service. Now, when I think about an event like creating a new user, each service will need to create a new entry in the users table, which is why I'm considering using an event bus.

I read this post which discusses CQRS and using HTTP (REST) for external communication between services, while using an event bus for internal communication. Services process (HTTP) requests, and emit events about data changes (e.g. the creation of a new User by the Auth service). Other services consume the events which may trigger other processes (and more events).

Question

Where I'm hung up is how to actually implement (in Python) a service which listens for both HTTP requests, and for new events in a set of subscribed channels. I get that you need to use a tool like redis/rabbitMQ, but is it possible to handle both types of requests in the same process, or do you need to run two servers (one for REST requests and the other for event handling)?

Also, if you have any comments on the general approach/architecture described above, I'm all ears.


Solution

  • So, after doing some more research and building a prototype, it is possible for a single server to listen for both HTTP requests and events from a message broker. However, it requires running two separate processes (one web server process to listen for HTTP, and one event process to listen to the message broker).

    Here's the architecture I developed for my prototype: enter image description here

    The core modules (represented by the folder icon) represent the meat of a service, this is all of the code that actually changes data. The HTTP Server and the Event Worker both call methods from the core modules. Niether the HTTP Server or the Event Worker produce events, only the core modules produce events.

    Here's a file structure:

    Project
     |-Foo
     |  |- foo.py
     |  |- web.py
     |  |- worker.py
     |  |- revent.py
     |-Bar
     |  |- bar.py
     |  |- web.py
     |  |- worker.py
     |  |- revent.py
    

    The web.py files are simple flask apps:

    # bar.py
    from flask import Flask, request
    from bar import Bar
    
    
    app = Flask(__name__)
    
    @app.route('/bar')
    def bar():
        return Bar.bar_action()
    
    if __name__ == "__main__":
        app.run(port=5001, debug=1)
    

    For both the event worker and the core modules, I used a module revent.py (redis + event) that I created. It consists of three classes:

    1. Event -- abstraction of an event
    2. Producer -- A service/class to be used by core modules to produce events into their event stream.
    3. Worker -- A event server to which you can map events to functions (sort of like routing HTTP endpoints in Flask), it also runs the event loop to listen for events.

    Under the hood, this module is using redis streams. I'll paste the code for revent.py below.

    But first, here is a sample exmaple for bar.py, which is called by the http server and worker to do work, and emits events about the work it's doing to the "bar" stream in redis.

    # Bar/bar.py
    from revent import Producer
    import redis
    
    class Bar():
        ep = Producer("bar", host="localhost", port=6379, db=0)
    
        @ep.event("update")
        def bar_action(self, foo, **kwargs):
            print("BAR ACTION")
            #ep.send_event("update", {"test": str(True)})
            return "BAR ACTION"
    
    if __name__ == '__main__':
        Bar().bar_action("test", test="True")
    

    Finally, here's a sample worker that will listen for events on the "bar" stream Foo/worker.py.

    # Foo/worker.py
    from revent import Worker
    
    worker = Worker()
    
    @worker.on('bar', "update")
    def test(foo, test=False):
        if bool(test) == False:
            print('test')
        else:
            print('tested')
    
    if __name__ == "__main__":
        worker.listen(host='127.0.0.1', port=6379, db=0)
    
    

    As promised, here's the code for the revent.py module I built. It would probably be worth adding a more further developed version of this to pypl, but I am just using sym link to keep my two versions in sync.

    # revent.py
    import redis
    from datetime import datetime
    import functools
    
    class Worker:
        # streams = {
        #   "bar": {
        #       "update": Foo.foo_action
        #   },
        # }
    
        def __init__(self):
            self._events = {}
    
    
        def on(self, stream, action, **options):
            """
            Wrapper to register a function to an event
            """
            def decorator(func):
                self.register_event(stream, action, func, **options)
                return func
            return decorator
    
        def register_event(self, stream, action, func, **options):
            """
            Map an event to a function
            """
            if stream in self._events.keys():
                self._events[stream][action] = func
            else:
                self._events[stream] = {action: func}
    
        def listen(self, host, port, db):
            """ 
            Main event loop
            Establish redis connection from passed parameters
            Wait for events from the specified streams
            Dispatch to appropriate event handler
            """
            self._r = redis.Redis(host=host, port=port, db=db)
            streams = " ".join(self._events.keys())
            while True:
                event = self._r.xread({streams: "$"}, None, 0) 
                # Call function that is mapped to this event
                self._dispatch(event)
    
        def _dispatch(self, event):
            """
            Call a function given an event
    
            If the event has been registered, the registered function will be called with the passed params.
            """
            e = Event(event=event)
            if e.action in self._events[e.stream].keys():
                func = self._events[e.stream][e.action]
                print(f"{datetime.now()} - Stream: {e.stream} - {e.event_id}: {e.action} {e.data}")
                return func(**e.data)
    
    
    class Event():
        """
        Abstraction for an event 
        """
        def __init__(self, stream="", action="", data={}, event=None):
            self.stream = stream
            self.action = action
            self.data = data
            self.event_id=None
            if event:
                self.parse_event(event)
    
        def parse_event(self, event):
            # event = [[b'bar', [(b'1594764770578-0', {b'action': b'update', b'test': b'True'})]]]
            self.stream = event[0][0].decode('utf-8')
            self.event_id = event[0][1][0][0].decode('utf-8')
            self.data = event[0][1][0][1]
            self.action = self.data.pop(b'action').decode('utf-8')
            params = {}
            for k, v in self.data.items():
                params[k.decode('utf-8')] = v.decode('utf-8')
            self.data = params
    
        def publish(self, r):
            body = {
                "action": self.action
            }
            for k, v in self.data.items():
                body[k] = v
            r.xadd(self.stream, body)
    
    class Producer:
        """
        Abstraction for a service (module) that publishes events about itself
    
        Manages stream information and can publish events
        """
        # stream = None
        # _r = redis.Redis(host="localhost", port=6379, db=0)
    
        def __init__(self, stream_name, host, port, db):
            self.stream = stream_name
            self._r = redis.Redis(host="localhost", port=6379, db=0)
    
        def send_event(self, action, data):
            e = Event(stream=self.stream, action=action, data=data)
            e.publish(self._r)
    
        def event(self, action, data={}):
            def decorator(func):
                @functools.wraps(func)
                def wrapped(*args, **kwargs):
                    result = func(*args, **kwargs)
                    arg_keys = func.__code__.co_varnames[1:-1]
                    for i in range(1, len(args)):
                        kwargs[arg_keys[i-1]] = args[i]
                    self.send_event(action, kwargs)
                    return result           
                return wrapped
            return decorator
    
    
    

    So, putting it all together. The foo.py and bar.py modules do the actual work of the Foo and Bar services respectively. Their methods are called by the HTTP server and the event worker to handle requests/events. In doing their work, these two modules emit events about their state changes so that other interested services can act accordingly. The HTTP server is just a normal web app using e.g. Flask. The event worker is similar in concept to a web server that listens for events in redis instead of http requests. Both of these processes (the web server and the event worker) need to run separately. So, if you're developing locally, you need to run them in different terminal windows or using a container/process orchestrator.

    That was a lot. I hope it helps someone, let me know in the comments if you have questions.

    Edit

    I uploaded the revent.py file to pypi as a package -- redisevents. I'll add more documentation on how to use/extend it later this week.