Search code examples
javaspring-bootgraphqlproject-reactorgraphql-subscriptions

Spring Boot, reactive streams hot publisher and GraphQL subscriptions. Doesn't work as expected


I'm using spring boot v3.0.2 with websockets and faced problem with graphQL subscriptions. The task is pretty simple - notify client when backend entity changes. So, I have EntityChangeNotifier -

@Service
public class EntityChangeNotifier {

    private final Collection<Listener<MyEntity>> listeners = new ArrayList<>();

    @Override
    public void notifyChange(MyEntity entity) {
        listeners.forEach(listener -> listener.onEntityChange(entity));
    }

    @Override
    public void registerListener(Listener<T> listener) {
        listeners.add(listener);
    }

    interface Listener<T> {
        void onEntityChange(T entity);
    }

}

Service class -

@Service
public class Service {

    private final MyRepository repository;

    private final EntityChangeNotifier entityChangeNotifier;

    @Autowired
    public Service(MyRepository repository, EntityChangeNotifier entityChangeNotifier) {
        this.repository = repository;
        this.entityChangeNotifier = entityChangeNotifier;
    }

    @Transactional
    public long saveEntity(MyEntity entity) {
        MyEntity saved = repository.save(entity);
        entityChangeNotifier.notifyChange(entity);
        return saved.getId();
    }

and GraphQL mappings -

@Controller
public class Mappings {

    private final EntityChangeNotifier entityChangeNotifier;
    private final Service service;

    @Autowired
    public Mappings(Service service, EntityChangeNotifier entityChangeNotifier) {
        this.service = service;
        this.entityChangeNotifier = entityChangeNotifier;
    }

    @SubscriptionMapping
    public Publisher<MyEntity> changed() {
        return Flux.create(fluxSink -> entityChangeNotifier.registerListener(fluxSink::next));
    }

    @MutationMapping
    public Long create() {
        return service.saveEntity(new MyEntity(0, "testName"));
    }
}

GraphQL schema -

type Mutation {
    create: Int!
}

type Subscription {
    changed: MyEntity
}

type MyEntity {
    id
    name
}

The code is very basic, but the problem is it doesn't work. I test it via graphiQL and after saving entity, subscription just ends without any result. As far as I discover, sink is marked as cancelled for some reason. Could you please hint me what is wrong? Btw - when I use cold publisher all works fine. But such code, obviously, doesn't comfort me -

    @SubscriptionMapping
    public Publisher<MyEntity> test() {
        Random random = new Random();
        return Flux.interval(Duration.ofSeconds(5))
                .map(i -> new MyEntity(0, "name" + random.nextInt()));
    }


Solution

  • Solved. The problem was in... special test conditions and graphiQL subscriptions handling. It just cancel subscription when you switch tab - so all is fine