I am trying to receive some messages regarding a running job on a RabbitMQ queue, save them to a database and at the same time, send some Server Sent Events to a waiting client to inform of the state of the running job.
The events are sent from a java.util.concurrent.SubmissionPublisher progressPublisher
, received on a @Controller
method (where JobProgress
is a small record containing a status code and possibly a message):
@GetMapping("/{id}/progress/stream")
public Flux<ServerSentEvent<JobProgress>> subscribeToProgress(@PathVariable("id") String id) {
return Flux.merge(JdkFlowAdapter.flowPublisherToFlux(progressPublisher)
.filter(progress -> progress.id().equals(id))
.map(progress -> ServerSentEvent.<JobProgress>builder()
.id(progress.id())
.event("job-progress")
.data(progress)
.retry(Duration.ofSeconds(10))
.build()
),
Flux.interval(Duration.ofMillis(500L))
.map(sequence -> ServerSentEvent.<JobProgress>builder()
.event("keep-alive")
.comment("Keeping the connection alive to avoid abrupt closing.")
.retry(Duration.ofSeconds(1))
.build()))
);
}
The configuration for the publisher:
@Configuration
public class PublisherConfiguration {
@Bean
public SubmissionPublisher<JobProgress> progressSink() {
return new SubmissionPublisher();
}
}
And the usage of it:
@Component
public class JobEventPublisher {
@Autowired
private final SubmissionPublisher<JobProgress> progressPublisher;
public void dispatchToClient(JobProgress progress) {
progressPublisher.submit(progress);
}
}
I've done some debugging and logging, and find that the events are sent through the flux (or at least mapped to a ServerSentEvent, haven't checked with WireShark yet).
This is the code to receive it (Typescript/React):
useEffect(() => {
const id = "some-uuid-received-on-post";
const source = new EventSource(`${baseUrl}/api/v1/job/${id}/progress/stream`, {withCredentials: true});
// The actual logic is irrelevant, this is never triggered.
source.onmessage = console.log;
return source.close;
}, [base
Url]);
The EventSource in the network tab never stops pending, and never receives a first message. There is a Spring Boot Cloud Gateway between the client and the server/microservice, but I don't think that should matter, as this is standard HTTP, and not Websocket.
I've tried WebSocket as well, but had just as much errors with that (different ones), and don't really need two-way communication, so this is what we want to use.
Most of my googling has turned up dead, but the retry and the keep-alive message were some things I tried to get it to work, but neither did.
I also have the problem that the events doesn't always reach the Controller method, but I think this is unrelated? If there is a better way than using the SubmissionPublisher, do tell. I also tried a Sinks.Many, with no success.
I'll happily accept comments with better ways to do this if you already have a working solution, I've been at this for way too long and really need to close this ticket.
I'd suggest testing the backend and the frontend separately.
For the backend, if it's working properly, you should be able to open your SSE url directly in a browser (i.e. http://localhost:8080/api/v1/job/123/progress/stream
) and see the event data appearing on the blank page.
One note here, you might also need to explicitly specify the produced content type in your controller like so:
@GetMapping(path = "...", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
For the frontend, you can point your EventSource
to an url of some mock SSE tool like https://sse.dev/test
and see if the events are received properly.