Search code examples
javaspringspring-bootserver-sent-events

SSE in Java SpringBoot giving uneven chunks


I have an SSE endpoint in flask which emit SSE data chunks which are according MDN standards which ends in two \n characters, and when I received them on my frontend they received on the same format.

But I need to write a proxy for the same endpoint in spring boot which make a call on flask SSE endpoint and return same events to client. When I hit that endpoint using the same script, I received uneven chunks, for example in some chunks data: received and the value of that event received in the next chunk.

Please help me with this issue.

Sample Flask APP

import json
import time

from flask import Flask, Response
from flask_cors import CORS, cross_origin

app = Flask(__name__)

cors = CORS(
    app,
    origins="*",
)

def markdown_event():
    yield "data: "+ json.dumps({"data": "{\"RESPONSE\": \"well_structured_markdown_formatted_response_with_citation_without_appending_references_and_sources_and_additional_questions\", \"HEADING\": \"\", \"NEXT_HEADING\": \"suggestive_questions\", \"REFERENCE\": \"id_list\", \"IS_SORRY\": \"\", \"IS_RELEVANT\": \"\", \"LANGUAGE_OF_QUERY\": \"\", \"TEXT_CLASS\": \"\"}", "type": "schema"}) + "\n\n"
    chunks = ["{\"", "well", "_struct", "ured", "_mark", "down", "_formatted", "_response", "_with", "_c", "itation", "_without", "_app", "ending", "_references", "_and", "_sources", "_and", "_additional", "_questions", "\":", " \"", "Hello", " there", " [", "2", "]", " how", " can", " i", " be", " sure", " that", " [", "1", ",", "2", "]", " citation", " are", " being", " covered", " [", "1", " -", " ", "4", "],", " more", " citation", " formats", ":", " [", "1", ",", " ", "3", " -", " ", "5", "].", " Some", " more", " [", "1", "][", "4", "](", "3", ").", " The", " years", " are", " like", " [", "199", "5", " -", " ", "199", "8", "]", " []", " String", ":", " '", "Hello", " there", " [", "4", "]", " how", " can", " i", " be", " sure", " that", " (", "1", ",", "2", ")", " citation", " are", " being", ".", " Hello", " [", "202", "1", "]", " the", " ", " college", " year", " was", " [", "199", "2", "-", "96", "].", "\\", "n", "\\n", "The", " task", " is", " simple", " you", " should", " remove", " citations", " from", " the", " random", " above", " string", " in", " format", " like", " [", "xxx", "]", " [", "xxx", " -", " xxx", "],", " [", "xxx", " ,", " xxx", "],", " where", " every", " x", " is", " a", " digit", ";", " and", " not", " years", " in", " the", " following", " formats", " [", "xxxx", "],", " [", "xxxx", " -", " xxxx", "],", " [", "xxxx", " -", " xx", "]\\", "n", "\\n", "The", " output", " of", " the", " above", " string", " would", " be", ":\\", "n", "Hello", " there", " ", " how", " can", " i", " be", " sure", " that", " ", " citation", " are", " being", " covered", " ,", " more", " citation", " formats", ":", " .", " Some", " more", " .", " The", " years", " are", " like", " [", "199", "5", " -", " ", "199", "8", "]", " []", " String", ":", " '", "Hello", " there", " ", " how", " can", " i", " be", " sure", " that", " ", " citation", " are", " being", ".", " Hello", " [", "202", "1", "]", " the", " ", " college", " year", " was", " [", "199", "2", "-", "96", "].", "\\", "n", "\\n", "See", " that", " was", " easy", ".\",", " \"", "reference", "_ids", "_mapping", "\":", " [{\"", "citation", "_number", "\":", " ", "1", ",", " \"", "reference", "_id", "\":", " \"", "01", "HT", "G", "AA", "28", "DB", "H", "8", "A", "1", "ND", "P", "XY", "RK", "SW", "GE", "\"},", " {\"", "citation", "_number", "\":", " ", "2", ",", " \"", "reference", "_id", "\":", " \"", "01", "HP", "7", "CV", "V", "38", "J", "BC", "7", "D", "RA", "4", "EW", "QM", "FS", "71", "\"},", " {\"", "citation", "_number", "\":", " ", "3", ",", " \"", "reference", "_id", "\":", " \"", "01", "HP", "72", "M", "DS", "19", "J", "TF", "J", "Y", "WF", "6", "G", "5", "A", "AY", "7", "W", "\"}", "],", " \"", "suggest", "ive", "_questions", "\":", " [\"", "What", " are", " the", " benefits", " of", " Q", "IV", " in", " children", " aged", " ", "6", " months", " to", " ", "17", " years", "?\",", " \"", "Can", " Q", "IV", " be", " administered", " to", " children", " aged", " ", "6", " months", " and", " above", "?\",", " \"", "What", " is", " the", " safety", " profile", " of", " Q", "IV", " in", " children", " aged", " ", "6", "-", "35", " months", "?", "\"]}"]

    for chunk in chunks:
        yield "data: "+json.dumps({"data": chunk, "type":"function_call"}) + "\n\n"
        time.sleep(0.1)
    
@app.route("/")
def index():
    return "Hello, go to /markdown endpoint"

@app.route("/markdown", methods=["GET", "POST"])
def markdown():
    return Response(markdown_event(), mimetype="text/event-stream")

if __name__ == "__main__":
    app.run(port=5500)

cmd to run above flask application:

python -m flask --app=app run --port=5500 --host=0.0.0.0

Sample client :

async function sse() {
    var requestOptions = {
        method: "POST",
        redirect: "follow",
    };
    const response = await fetch(
        "http://localhost:5000/proxy-sse-2",
        // "http://localhost:5500/markdown",
        requestOptions
    );
    console.log(response.status)
    console.log(response.headers)
    count = 1
    const reader = response.body
        .pipeThrough(new TextDecoderStream())
        .getReader();
    while (true) {
        const { value, done } = await reader.read();
        if (done) break;
        console.log("Chunk: ", count, " *********************************");
        const a = String.raw`${value}`;
        console.log("Received", a);
        console.log("--------------------------------");
        count += 1;
    }
    console.log(count);
}
sse()

Java spring boot controller with SseEmitter:

package com.spring.demo.controller;

import java.io.IOException;
import java.time.LocalDateTime;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;

@RestController
public class HelloWorldController {

    private final String targetSseUrl = "http://localhost:5500/";

    private final WebClient webClient;

    public HelloWorldController(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.baseUrl(targetSseUrl).build();
    }

    @PostMapping("/proxy-sse")
    public SseEmitter proxySse() {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        // Call the original SSE endpoint and forward events to the client
        webClient.post()
                .uri("/markdown")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class)
                .subscribe(arg0 -> {
                    try {
                        System.err.println(arg0);
                        emitter.send(SseEmitter.event().data(arg0));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }, emitter::completeWithError, emitter::complete);

        return emitter;
    }

    @PostMapping("/local-sse")
    public SseEmitter proxySse1() {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        emitter.onTimeout(() -> System.out.println("SSE connection timed out"));
        emitter.onCompletion(() -> System.out.println("SSE connection closed"));

        new Thread(() -> {
            int count = 0;
            while (true) {
                count++;
                try {
                    String data = "Event at: " + LocalDateTime.now();
                    SseEventBuilder eventBuilder = SseEmitter.event().data(data);
                    emitter.send(eventBuilder);
                    Thread.sleep(100); // Send event every 100ms (adjust as needed)
                } catch (Exception e) {
                    emitter.completeWithError(e);
                    break;
                }
                if (count == 100) { // Number of events to be send (adjust as needed)
                    emitter.complete();
                    break;
                }
            }
        }).start();

        return emitter;
    }
}

Above code block have two endpoints, one which make a call on flask endpoint and emit data received from that and another endpoint generate data and omit to client.

When I make a call on flask endpoint directly I received complete chunks but when I make call on any of the java endpoint I received uneven broken chunks.

Result for all endpoints:

  1. flask endpoint
  2. java proxy SSE endpoint
  3. java local SSE endpoint

Please help me how to fix this issue.


Solution

  • This behavior is in the implementation of the SseEmitter in spring, but it works with JavaScript’s default EventSource and also works with extended-eventsource if you want to make POST request to SSE endpoint.

    Can find more details here.