Search code examples
javaapirabbitmqpush-diffusion

How to remove a topic when it has no subscribers?


I am building a Diffusion solution that create a tree of topics.

I am creating topics on demand to reflect values received from a RabbitMQ feed. Each topic has a memory cost, so I am looking to remove the topic once it has had no subscribers for some time.

How can this do done with unified Java API?


Solution

  • The TopicEventListener (part of the TopicControl feature) provides this functionality. Providing individual callbacks for when a topic has 0 subscribers, as well as when a topic has at least 1 subscriber.

    Example:

        public TopicEventListenerClient() {
            session = Diffusion.sessions().principal("admin").password("password").open("ws://localhost:8080");
            topicControl = session.feature(TopicControl.class);
            topicControl.addTopicEventListener("rabbitMQ/foo", new TopicEventListener() {
    
                @Override
                public void onClose(String arg0) {
                    LOG.info("Listener closed");
                }
    
                @Override
                public void onError(String arg0, ErrorReason arg1) {
                    LOG.info("Error on listener: " + arg1);
                }
    
                @Override
                public void onRegistered(String arg0, Registration arg1) {
                    LOG.info("Listener registered");
                }
    
                @Override
                public void onHasSubscribers(String arg0) {
                    LOG.info("Topic: " + arg0  + " has at least 1 subscriber");
                }
    
                @Override
                public void onNoSubscribers(String arg0) {
                    LOG.info("Topic: " + arg0  + " has no subscribers");
                }
            });
        }
    

    Hope this helps!

    EDIT: 04/10/2019

    TopicEventListener is deprecated (since Diffusion version 6.1). The preferred method of removing a topic with no subscribers is Automatic Topic Removal. This can be achieved by specifying a Removal Policy as part of the topic's specification.

    For example:

    final Session session = Diffusion.sessions().principal("admin").password("password").open("ws://localhost:8080");
    
    final TopicControl topicControl = session.feature(TopicControl.class);
    
    final TopicSpecification specification =
                topicControl.newSpecification(TopicType.JSON)
                    .withProperty(TopicSpecification.REMOVAL, "when subscriptions < 1 for 10s");
    

    The above code creates a topic specification for a JSON topic that will be automatically removed if it has no subscribers for 10 seconds.