Search code examples

Spring Boot, reactive streams hot publisher and GraphQL subscriptions. Doesn't work as expected

I'm using spring boot v3.0.2 with websockets and faced problem with graphQL subscriptions. The task is pretty simple - notify client when backend entity changes. So, I have EntityChangeNotifier -

public class EntityChangeNotifier {

    private final Collection<Listener<MyEntity>> listeners = new ArrayList<>();

    public void notifyChange(MyEntity entity) {
        listeners.forEach(listener -> listener.onEntityChange(entity));

    public void registerListener(Listener<T> listener) {

    interface Listener<T> {
        void onEntityChange(T entity);


Service class -

public class Service {

    private final MyRepository repository;

    private final EntityChangeNotifier entityChangeNotifier;

    public Service(MyRepository repository, EntityChangeNotifier entityChangeNotifier) {
        this.repository = repository;
        this.entityChangeNotifier = entityChangeNotifier;

    public long saveEntity(MyEntity entity) {
        MyEntity saved =;
        return saved.getId();

and GraphQL mappings -

public class Mappings {

    private final EntityChangeNotifier entityChangeNotifier;
    private final Service service;

    public Mappings(Service service, EntityChangeNotifier entityChangeNotifier) {
        this.service = service;
        this.entityChangeNotifier = entityChangeNotifier;

    public Publisher<MyEntity> changed() {
        return Flux.create(fluxSink -> entityChangeNotifier.registerListener(fluxSink::next));

    public Long create() {
        return service.saveEntity(new MyEntity(0, "testName"));

GraphQL schema -

type Mutation {
    create: Int!

type Subscription {
    changed: MyEntity

type MyEntity {

The code is very basic, but the problem is it doesn't work. I test it via graphiQL and after saving entity, subscription just ends without any result. As far as I discover, sink is marked as cancelled for some reason. Could you please hint me what is wrong? Btw - when I use cold publisher all works fine. But such code, obviously, doesn't comfort me -

    public Publisher<MyEntity> test() {
        Random random = new Random();
        return Flux.interval(Duration.ofSeconds(5))
                .map(i -> new MyEntity(0, "name" + random.nextInt()));


  • Solved. The problem was in... special test conditions and graphiQL subscriptions handling. It just cancel subscription when you switch tab - so all is fine