Search code examples
pythoncherrypyserver-sent-events

How to generate server-sent events for status change notifications in a Python web app?


I have a web app written in CherryPy: a user uploads a file, then some lengthy operation begins, passing through several stages. I want notifications for these stages to be pushed to all the connected clients. But I don't know how to communicate between processes. I guess I would have to launch the lengthy operation in a separate process, but then I don't know how to pass the "advanced to stage N" messages to the "server-sending function".

Conceptually, it would be something like this:

SSEtest.py:

from pathlib import Path
from time import sleep
import cherrypy


def lengthy_operation(name, stream):
    for stage in range(10):
        print(f'stage {stage}... ', end='')
        sleep(2)
        print('done')
    print('finished')


class SSETest():

    @cherrypy.expose
    def index(self):
        return Path('SSEtest.html').read_text()

    @cherrypy.expose
    def upload(self, file):
        name = file.filename.encode('iso-8859-1').decode('utf-8')
        lengthy_operation(name, file.file)
        return 'OK'

    @cherrypy.expose
    def stage(self):
        cherrypy.response.headers['Content-Type'] = 'text/event-stream;charset=utf-8'

        def lengthy_operation():
            for stage in range(5):
                yield f'data: stage {stage}... \n\n'
                sleep(2)
                yield 'data: done\n\n'
            yield 'data: finished\n\n'

        return lengthy_operation()

    stage._cp_config = {'response.stream': True, 'tools.encode.encoding': 'utf-8'}


cherrypy.quickstart(SSETest())

SSEtest.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <title>SSE Test</title>
</head>
<body>
<h1>SSE Test</h1>
<div>
    <form id="load_file_form" action="" enctype="multipart/form-data">
        <label for="load_file">Load a file: </label>
        <input type="file" id="load_file" name="load_file">
        <progress max="100" value="0" id="progress_bar"></progress>
    </form>
</div>

<div id="status_messages">
<h3>Stages:</h3>
</div>

<script>
    const load_file = document.getElementById('load_file');
    const progress_bar = document.getElementById('progress_bar');

    function update_progress_bar(event) {
        if (event.lengthComputable) {
            progress_bar.value = Math.round((event.loaded/event.total)*100);
        }
    }

    load_file.onchange = function (event) {
        let the_file = load_file.files[0];
        let formData = new FormData();
        let connection = new XMLHttpRequest();

        formData.append('file', the_file, the_file.name);

        connection.open('POST', 'upload', true);
        connection.upload.onprogress = update_progress_bar;
        connection.onload = function (event) {
            if (connection.status != 200) {
                alert('Error! ' + event);
            }
        };

        connection.send(formData);
    };

    const status_messages = document.getElementById("status_messages");
    const sse = new EventSource("stage");

    sse.onopen = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = "Connection established: " + event.type;
        status_messages.appendChild(new_message);
    };

    sse.onmessage = function (event) {
        let new_message = document.createElement("p");
        new_message.innerHTML = event.data;
        status_messages.appendChild(new_message);
    };

    sse.onerror = function(event) {
        let new_message = document.createElement("p");
        if (event.readyState == EventSource.CLOSED) {
            new_message.innerHTML = "Connections closed";
        } else {
            new_message.innerHTML = "Error: " + event.type;
        }
        status_messages.appendChild(new_message);
    };

</script>
</body>
</html>

I need lengthy_operation() to be called only once, when the file is uploaded. And the messages generated by it to be sent to all the clients. Now it works with the local function, which is not what I want. How can I use the outer function and pass its messages into the stage() method?


Solution

  • I want notifications for these stages to be pushed to all the connected clients.

    I suspect in the end you will want more control than that, but I will answer your question as it was asked. Later, you may want to build on the example below and filter the broadcasted notifications based on the user's session, or based on a certain starting timestamp, or some other relevant concept.

    Each "connected client" is effectively hanging on a long-running request to /stage which the server will use to stream events to the client. In your example, each client will begin that request immediately and leave it open until the server terminates the stream. You can also close the stream from the client using close() on the EventSource.

    Basic Solution

    You asked how to have the /stage handler broadcast or mirror its events to all of the currently-connected clients. There are many ways you could accomplish this, but in a nutshell you want the lengthy_operation function to either post events to all /stage handler readers or to a persistent shared location from which all /stage handlers read. I will show a way to encapsulate the first idea described above.

    Consider a generic stream event class that serializes to data: <some message>:

    class StreamEvent:
        def __init__(self, message: str) -> bytes:
            self.message = message
    
        def serialize(self) -> str:
            return f'data: {self.message}\n\n'.encode('utf-8')
    

    and a more specific derived case for file-related stream events:

    class FileStreamEvent(StreamEvent):
        def __init__(self, message: str, name: str):
            super().__init__(message)
            self.name = name
    
        def serialize(self) -> bytes:
            return f'data: file: {self.name}: {self.message}\n\n'.encode('utf-8')
    

    You can create an extremely primitive publish/subscribe type of container where /stage can then subscribe listeners and lengthy_operation() can publish StreamEvent instances to all listeners:

    class StreamSource:
        def __init__(self):
            self.listeners: List[Queue] = []
    
        def put(self, event: StreamEvent):
            for listener in self.listeners:
                listener.put_nowait(event)
    
        def get(self):
            listener = Queue()
            self.listeners.append(listener)
            try:
                while True:
                    event = listener.get()
                    yield event.serialize()
            finally:
                self.listeners.remove(listener)
    

    In StreamSource.get(), you likely want to create an end case (e.g. check for a "close" or "finish" event) to exit from the generic while True and you likely want to set a timeout on the blocking Queue.get() call. But for the sake of this example, I kept everything basic.

    Now, lengthy_operation() just needs a reference to a StreamSource:

    def lengthy_operation(events: StreamSource, name: str, stream: BinaryIO):
        for stage in range(10):
            events.put(FileStreamEvent(f'stage {stage}: begin', name))
            sleep(2)
            events.put(FileStreamEvent(f'stage {stage}: end', name))
        events.put(FileStreamEvent('finished', name))
    

    SSETest can then provide a shared instance of StreamSource to each lengthy_operation() call and SSETest.stage() can use StreamSource.get() to register a listener on this shared instance:

    class SSETest:
        _stream_source: StreamSource = StreamSource()
    
        @cherrypy.expose
        def index(self):
            return Path('SSETest.html').read_text()
    
        @cherrypy.expose
        def upload(self, file):
            name = file.filename.encode('iso-8859-1').decode('utf-8')
            lengthy_operation(self._stream_source, name, file.file)
            return 'OK'
    
        @cherrypy.expose
        def stage(self):
            cherrypy.response.headers['Cache-Control'] = 'no-cache'
            cherrypy.response.headers['Content-Type'] = 'text/event-stream'
            def stream():
                yield from self._stream_source.get()
            return stream()
    
        stage._cp_config = {'response.stream': True}
    

    This is a complete[1] example of how to resolve your immediate question but you will most likely want to adapt this as you work closer to the final user experience you probably have in mind.

    [1]: I left out the imports for readability, so here they are:

    from dataclasses import dataclass
    from pathlib import Path
    from queue import Queue
    from time import sleep
    from typing import BinaryIO, List
    import cherrypy
    

    Follow-on Exit Conditions

    Since you are using cherrypy.quickstart(), in the minimal viable solution above you will have to forcefully exit the SSETest service as I did not assume any graceful "stop" behaviors for you. The first solution explicitly points this out but offers no solution for the sake of readability.

    Let's look at a couple ways to provide some initial graceful "stop" conditions:

    Add a stop condition to StreamSource

    First, at least add a reasonable stop condition to StreamSource. For instance, add a running attribute that allows the StreamSource.get() while loop to exit gracefully. Next, set a reasonable Queue.get() timeout so the loop can periodically test this running attribute between processing messages. Next, ensure at least some relevant CherryPy bus messages trigger this stop behavior. Below, I have rolled all of this behavior into the StreamSource class but you could also register a separate application level CherryPy plugin to handle calling into StreamSource.stop() rather than making StreamSource a plugin. I will demonstrate what that looks like when I add a separate signal handler.

    class StreamSource(plugins.SimplePlugin):
        def __init__(self, bus: wspbus.Bus):
            super().__init__(bus)
            self.subscribe()
            self.running = True
            self.listeners: List[Queue] = []
    
        def graceful(self):
            self.stop()
    
        def exit(self):
            self.stop()
    
        def stop(self):
            self.running = False
    
        def put(self, event: StreamEvent):
            for listener in self.listeners:
                listener.put_nowait(event)
    
        def get(self):
            listener = Queue()
            self.listeners.append(listener)
            try:
                while self.running:
                    try:
                        event = listener.get(timeout=1.0)
                        yield event.serialize()
                    except Empty:
                        pass
            finally:
                self.listeners.remove(listener)
    

    Now, SSETest will need to initialize StreamSource with a bus value since the class is now a SimplePlugin:

        _stream_source: StreamSource = StreamSource(cherrypy.engine)
    

    You will find that this solution gets you much closer to what you likely want in terms of user experience. Issue a keyboard interrupt and CherryPy will begin stopping the system, but the first graceful keyboard interrupt will not publish a stop message, for that you need to send a second keyboard interrupt.

    Add a SIGINT handler to capture keyboard interrupts

    Due to the way cherrypy.quickstart works with signal handlers, you may then want to register a SIGINT handler as a CherryPy-compatible SignalHandler plugin to gracefully stop the StreamSource at the first keyboard interrupt.

    Here is an example:

    class SignalHandler(plugins.SignalHandler):
        def __init__(self, bus: wspbus.Bus, sse):
            super().__init__(bus)
            self.handlers = {
                'SIGINT': self.handle_SIGINT,
            }
            self.sse = sse
    
        def handle_SIGINT(self):
            self.sse.stop()
            raise KeyboardInterrupt()
    

    Note that in this case I am demonstrating a generic application level handler which you can then configure and initialize by altering your startup cherrypy.quickstart() logic as follows:

    sse = SSETest()
    SignalHandler(cherrypy.engine, sse).subscribe()
    cherrypy.quickstart(sse)
    

    For this example, I expose a generic application SSETest.stop method to encapsulate the desired behavior:

    class SSETest:
        _stream_source: StreamSource = StreamSource(cherrypy.engine)
    
        def stop(self):
            self._stream_source.stop()
    

    Wrap-up analysis

    I am not a CherryPy user and I only started looking at it for the first time yesterday just to answer your question, so I will leave "CherryPy best practices" up to your discretion.

    In reality, your problem is a very generic combination of the following Python questions:

    1. how can I implement a simple publish/subscribe pattern? (answered with Queue);
    2. how can I create an exit condition for the subscriber loop? (answered with Queue.get()'s timeout parameter and a running attribute)
    3. how can I influence the exit condition with keyboard interrupts? (answered with a CherryPy-specific signal handler, but this merely sits on top of concepts you will find in Python's built in signal module)

    You can solve all of these questions in many ways and some lean more toward generic "Pythonic" solutions (my preference where it makes sense) while others leverage CherryPy-centric concepts (and that makes sense in cases where you want to augment CherryPy behavior rather than rewrite or break it).

    As an example, you could use CherryPy bus messages to convey stream messages, but to me that entangles your application logic a bit too much in CherryPy-specific features, so I would probably find a middle ground where you handle your application features generically (so as not to tie yourself to CherryPy) as seen in how my StreamSource example uses a standard Python Queue pattern. You could choose to make StreamSource a plugin so that it can respond to certain CherryPy bus messages directly (as I show above), or you could have a separate plugin that knows to call into the relevant application-specific domains such as StreamSource.stop() (similar to what I show with SignalHandler).

    Last, all of your questions are great, but they have all likely been answered before on SO as generic Python questions, so while I am tying the answers here to your CherryPy problem space I also want to help you (and future readers) realize how to think about these particular problems more abstractly beyond CherryPy.