Search code examples
spring-bootspring-webfluxreactiveserver-sent-events

Publishing events from RabbitMQ queue to Reactive ServerSentEvents Flux in Spring Boot


I am trying to receive some messages regarding a running job on a RabbitMQ queue, save them to a database and at the same time, send some Server Sent Events to a waiting client to inform of the state of the running job.

The events are sent from a java.util.concurrent.SubmissionPublisher progressPublisher, received on a @Controller method (where JobProgress is a small record containing a status code and possibly a message):

@GetMapping("/{id}/progress/stream")
public Flux<ServerSentEvent<JobProgress>> subscribeToProgress(@PathVariable("id") String id) {
  return Flux.merge(JdkFlowAdapter.flowPublisherToFlux(progressPublisher)
    .filter(progress -> progress.id().equals(id))
    .map(progress -> ServerSentEvent.<JobProgress>builder()
      .id(progress.id())
      .event("job-progress")
      .data(progress)
      .retry(Duration.ofSeconds(10))
      .build()
    ), 
    Flux.interval(Duration.ofMillis(500L))
      .map(sequence -> ServerSentEvent.<JobProgress>builder()
        .event("keep-alive")
        .comment("Keeping the connection alive to avoid abrupt closing.")
        .retry(Duration.ofSeconds(1))
        .build()))
  );
}

The configuration for the publisher:

@Configuration
public class PublisherConfiguration {
  @Bean
  public SubmissionPublisher<JobProgress> progressSink() {
    return new SubmissionPublisher();
  }
}

And the usage of it:

@Component
public class JobEventPublisher {
  @Autowired
  private final SubmissionPublisher<JobProgress> progressPublisher;

  public void dispatchToClient(JobProgress progress) {
    progressPublisher.submit(progress);
  }
}

I've done some debugging and logging, and find that the events are sent through the flux (or at least mapped to a ServerSentEvent, haven't checked with WireShark yet).

This is the code to receive it (Typescript/React):

useEffect(() => {
  const id = "some-uuid-received-on-post";
  const source = new EventSource(`${baseUrl}/api/v1/job/${id}/progress/stream`, {withCredentials: true});

  // The actual logic is irrelevant, this is never triggered.
  source.onmessage = console.log;

 return source.close;
}, [base

Url]);

The EventSource in the network tab never stops pending, and never receives a first message. There is a Spring Boot Cloud Gateway between the client and the server/microservice, but I don't think that should matter, as this is standard HTTP, and not Websocket.

I've tried WebSocket as well, but had just as much errors with that (different ones), and don't really need two-way communication, so this is what we want to use.

Most of my googling has turned up dead, but the retry and the keep-alive message were some things I tried to get it to work, but neither did.

I also have the problem that the events doesn't always reach the Controller method, but I think this is unrelated? If there is a better way than using the SubmissionPublisher, do tell. I also tried a Sinks.Many, with no success.

I'll happily accept comments with better ways to do this if you already have a working solution, I've been at this for way too long and really need to close this ticket.


Solution

  • I'd suggest testing the backend and the frontend separately.

    For the backend, if it's working properly, you should be able to open your SSE url directly in a browser (i.e. http://localhost:8080/api/v1/job/123/progress/stream) and see the event data appearing on the blank page.

    One note here, you might also need to explicitly specify the produced content type in your controller like so:

    @GetMapping(path = "...", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    

    For the frontend, you can point your EventSource to an url of some mock SSE tool like https://sse.dev/test and see if the events are received properly.